]> git.meshlink.io Git - meshlink/blobdiff - src/net_packet.c
Add support for sendmmsg().
[meshlink] / src / net_packet.c
index 4d3b20020e6deb381fb6d701d749469ebc9a1027..6a1d28a3392e0a8bd2e87793d1819684eb656ddb 100644 (file)
@@ -226,10 +226,13 @@ static void mtu_probe_h(meshlink_handle_t *mesh, node_t *n, vpn_packet_t *packet
 
 /* VPN packet I/O */
 
-#ifdef HAVE_RECVMMSG
-#define MAX_MMSG 16
+#if defined(HAVE_RECVMMSG) || defined(HAVE_SENDMMSG)
+#define MAX_MMSG 32
 
 struct mmsgs {
+       int count;
+       int offset;
+       int fd;
        struct mmsghdr hdrs[MAX_MMSG];
        struct iovec iovs[MAX_MMSG];
        sockaddr_t addrs[MAX_MMSG];
@@ -237,6 +240,9 @@ struct mmsgs {
 };
 
 static void init_mmsg_array(struct mmsgs *mmsgs) {
+       mmsgs->count = 0;
+       mmsgs->offset = 0;
+
        for(int i = 0; i < MAX_MMSG; i++) {
                mmsgs->hdrs[i].msg_hdr.msg_name = &mmsgs->addrs[i];
                mmsgs->hdrs[i].msg_hdr.msg_namelen = sizeof(mmsgs->addrs[i]);
@@ -253,7 +259,6 @@ void init_mmsg(meshlink_handle_t *mesh) {
 
        init_mmsg_array(mesh->in_mmsgs);
        init_mmsg_array(mesh->out_mmsgs);
-
 }
 
 void exit_mmsg(meshlink_handle_t *mesh) {
@@ -265,6 +270,64 @@ void exit_mmsg(meshlink_handle_t *mesh) {
 }
 #endif
 
+#ifdef HAVE_SENDMMSG
+void flush_mmsg(meshlink_handle_t *mesh) {
+       struct mmsgs *mmsgs = mesh->out_mmsgs;
+
+       int todo = mmsgs->count - mmsgs->offset;
+       int offset = mmsgs->offset;
+
+       while(todo) {
+               int result = sendmmsg(mmsgs->fd, mmsgs->hdrs + offset, todo, 0);
+
+               if(result <= 0) {
+                       logger(mesh, MESHLINK_WARNING, "Error sending packet: %s", sockstrerror(errno));
+                       break;
+               }
+
+               todo -= result;
+               offset += result;
+       }
+
+       mmsgs->count = 0;
+       mmsgs->offset = 0;
+}
+
+static vpn_packet_t *get_next_mmsg_pkt(meshlink_handle_t *mesh) {
+       struct mmsgs *mmsgs = mesh->out_mmsgs;
+
+       if(mmsgs->count >= MAX_MMSG) {
+               flush_mmsg(mesh);
+       }
+
+       return &mmsgs->pkts[mmsgs->count];
+}
+
+static void add_mmsg(meshlink_handle_t *mesh, int fd, const void *data, size_t len, const struct sockaddr *sa, socklen_t salen) {
+       struct mmsgs *mmsgs = mesh->out_mmsgs;
+       assert(mmsgs->count < MAX_MMSG);
+       assert(data == get_next_mmsg_pkt(mesh)->data);
+
+       if(mmsgs->fd != fd) {
+               // Flush all packets from the previous fd
+               int oldcount = mmsgs->count;
+               flush_mmsg(mesh);
+
+               // Adjust offset and count to start the next flush with this packet
+               mmsgs->fd = fd;
+               mmsgs->count = oldcount;
+               mmsgs->offset = oldcount;
+       }
+
+       assert(mmsgs->iovs[mmsgs->count].iov_base == mmsgs->pkts[mmsgs->count].data);
+       assert(mmsgs->hdrs[mmsgs->count].msg_hdr.msg_iovlen == 1);
+       mmsgs->iovs[mmsgs->count].iov_len = len;
+       memcpy(mmsgs->hdrs[mmsgs->count].msg_hdr.msg_name, sa, salen);
+       mmsgs->hdrs[mmsgs->count].msg_hdr.msg_namelen = salen;
+       mmsgs->count++;
+}
+#endif
+
 static void receive_packet(meshlink_handle_t *mesh, node_t *n, vpn_packet_t *packet) {
        logger(mesh, MESHLINK_DEBUG, "Received packet of %d bytes from %s", packet->len, n->name);
 
@@ -318,6 +381,10 @@ static void send_sptps_packet(meshlink_handle_t *mesh, node_t *n, vpn_packet_t *
 
        uint8_t type = 0;
 
+#ifdef HAVE_SENDMMSG
+       sptps_set_send_buffer(&n->sptps, get_next_mmsg_pkt(mesh)->data, MAXSIZE);
+#endif
+
        // If it's a probe, send it immediately without trying to compress it.
        if(origpkt->probe) {
                sptps_send_record(&n->sptps, PKT_PROBE, origpkt->data, origpkt->len);
@@ -441,6 +508,15 @@ bool send_sptps_data(void *handle, uint8_t type, const void *data, size_t len) {
                choose_udp_address(mesh, to, &sa, &sock);
        }
 
+#ifdef HAVE_SENDMMSG
+
+       if(type != PKT_PROBE) {
+               add_mmsg(mesh, mesh->listen_socket[sock].udp.fd, data, len, &sa->sa, SALEN(sa->sa));
+               return true;
+       }
+
+#endif
+
        if(sendto(mesh->listen_socket[sock].udp.fd, data, len, 0, &sa->sa, SALEN(sa->sa)) < 0 && !sockwouldblock(sockerrno)) {
                if(sockmsgsize(sockerrno)) {
                        if(to->maxmtu >= len) {