]> git.meshlink.io Git - meshlink/commitdiff
Add support for recvmmsg().
authorGuus Sliepen <guus@meshlink.io>
Mon, 6 Apr 2020 20:24:52 +0000 (22:24 +0200)
committerGuus Sliepen <guus@meshlink.io>
Thu, 16 Apr 2020 00:18:10 +0000 (02:18 +0200)
Performance analysis with Linux's perf tool has shown that while
transferring a stream of data, MeshLink's main thread spends 66% of the
time in the system calls select(), sendto() and recvfrom() combined. One
way to reduce that is to handle more than one packet at once with a
single system call.

This commit makes MeshLink use recvmmsg() if available.

configure.ac
src/meshlink.c
src/meshlink_internal.h
src/net.h
src/net_packet.c

index b1b1118d138c1c2b1e5f47d55bbe4c3c2a22789b..45e83b94cbb1d790fadf6e9fdb77c4d5005963e1 100644 (file)
@@ -126,7 +126,7 @@ MeshLink_ATTRIBUTE(__warn_unused_result__)
 
 dnl Checks for library functions.
 AC_TYPE_SIGNAL
-AC_CHECK_FUNCS([asprintf fchmod fork gettimeofday random pselect select setns strdup usleep getifaddrs freeifaddrs],
+AC_CHECK_FUNCS([asprintf fchmod fork gettimeofday random pselect recvmmsg select setns strdup usleep getifaddrs freeifaddrs],
   [], [], [#include "$srcdir/src/have.h"]
 )
 
index b85795d465e3c64c59bc6c0990f3deb60dfd3f86..f14c4940800b587dcb8f14688aabd9a43ac41c0e 100644 (file)
@@ -1652,6 +1652,7 @@ bool meshlink_start(meshlink_handle_t *mesh) {
                return false;
        }
 
+       init_mmsg(mesh);
        init_outgoings(mesh);
        init_adns(mesh);
 
@@ -1726,6 +1727,7 @@ void meshlink_stop(meshlink_handle_t *mesh) {
 
        exit_adns(mesh);
        exit_outgoings(mesh);
+       exit_mmsg(mesh);
 
        // Ensure we are considered unreachable
        if(mesh->nodes) {
index 8ba4e9801a6a743e47122fe6332e69baca3f691f..2a9f74bf2b0c6fc58639e56599d799295e93c863 100644 (file)
@@ -92,6 +92,10 @@ struct meshlink_handle {
        meshlink_log_cb_t log_cb;
        meshlink_log_level_t log_level;
        void *packet;
+#ifdef HAVE_RECVMMSG
+       struct mmsgs *in_mmsgs;
+       struct mmsgs *out_mmsgs;
+#endif
 
        // The most important network-related members come first
        int reachable;
index 5ea2ae3ed28bd88c036e0fd3076b53dfe689d9a6..a198dfe40065fce25fe1fc6f89307b9a503a33d1 100644 (file)
--- a/src/net.h
+++ b/src/net.h
@@ -82,6 +82,10 @@ typedef struct outgoing_t {
 
 extern void init_outgoings(struct meshlink_handle *mesh);
 extern void exit_outgoings(struct meshlink_handle *mesh);
+#ifdef HAVE_RECVMMSG
+extern void init_mmsg(struct meshlink_handle *mesh);
+extern void exit_mmsg(struct meshlink_handle *mesh);
+#endif
 
 extern void retry_outgoing(struct meshlink_handle *mesh, outgoing_t *);
 extern void handle_incoming_vpn_data(struct event_loop_t *loop, void *, int);
index 56012e0e36a887c9bc2e45eaebaf13e3484d4272..4d3b20020e6deb381fb6d701d749469ebc9a1027 100644 (file)
@@ -226,6 +226,45 @@ static void mtu_probe_h(meshlink_handle_t *mesh, node_t *n, vpn_packet_t *packet
 
 /* VPN packet I/O */
 
+#ifdef HAVE_RECVMMSG
+#define MAX_MMSG 16
+
+struct mmsgs {
+       struct mmsghdr hdrs[MAX_MMSG];
+       struct iovec iovs[MAX_MMSG];
+       sockaddr_t addrs[MAX_MMSG];
+       vpn_packet_t pkts[MAX_MMSG];
+};
+
+static void init_mmsg_array(struct mmsgs *mmsgs) {
+       for(int i = 0; i < MAX_MMSG; i++) {
+               mmsgs->hdrs[i].msg_hdr.msg_name = &mmsgs->addrs[i];
+               mmsgs->hdrs[i].msg_hdr.msg_namelen = sizeof(mmsgs->addrs[i]);
+               mmsgs->hdrs[i].msg_hdr.msg_iov = &mmsgs->iovs[i];
+               mmsgs->hdrs[i].msg_hdr.msg_iovlen = 1;
+               mmsgs->iovs[i].iov_base = mmsgs->pkts[i].data;
+               mmsgs->iovs[i].iov_len = MAXSIZE;
+       }
+}
+
+void init_mmsg(meshlink_handle_t *mesh) {
+       mesh->in_mmsgs = xzalloc(sizeof(*mesh->in_mmsgs));
+       mesh->out_mmsgs = xzalloc(sizeof(*mesh->out_mmsgs));
+
+       init_mmsg_array(mesh->in_mmsgs);
+       init_mmsg_array(mesh->out_mmsgs);
+
+}
+
+void exit_mmsg(meshlink_handle_t *mesh) {
+       free(mesh->out_mmsgs);
+       free(mesh->in_mmsgs);
+
+       mesh->out_mmsgs = NULL;
+       mesh->in_mmsgs = NULL;
+}
+#endif
+
 static void receive_packet(meshlink_handle_t *mesh, node_t *n, vpn_packet_t *packet) {
        logger(mesh, MESHLINK_DEBUG, "Received packet of %d bytes from %s", packet->len, n->name);
 
@@ -533,42 +572,18 @@ static node_t *try_harder(meshlink_handle_t *mesh, const sockaddr_t *from, const
        return n;
 }
 
-void handle_incoming_vpn_data(event_loop_t *loop, void *data, int flags) {
-       (void)flags;
-       meshlink_handle_t *mesh = loop->data;
-       listen_socket_t *ls = data;
-       vpn_packet_t pkt;
-       char *hostname;
-       sockaddr_t from;
-       socklen_t fromlen = sizeof(from);
-       node_t *n;
-       int len;
-
-       memset(&from, 0, sizeof(from));
-
-       len = recvfrom(ls->udp.fd, pkt.data, MAXSIZE, 0, &from.sa, &fromlen);
-
-       if(len <= 0 || len > MAXSIZE) {
-               if(!sockwouldblock(sockerrno)) {
-                       logger(mesh, MESHLINK_ERROR, "Receiving packet failed: %s", sockstrerror(sockerrno));
-               }
-
-               return;
-       }
-
-       pkt.len = len;
-
-       sockaddrunmap(&from); /* Some braindead IPv6 implementations do stupid things. */
+static void handle_one_incoming(meshlink_handle_t *mesh, listen_socket_t *ls, vpn_packet_t *pkt, sockaddr_t *from) {
+       sockaddrunmap(from); /* Some braindead IPv6 implementations do stupid things. */
 
-       n = lookup_node_udp(mesh, &from);
+       node_t *n = lookup_node_udp(mesh, from);
 
        if(!n) {
-               n = try_harder(mesh, &from, &pkt);
+               n = try_harder(mesh, from, pkt);
 
                if(n) {
-                       update_node_udp(mesh, n, &from);
+                       update_node_udp(mesh, n, from);
                } else if(mesh->log_level <= MESHLINK_WARNING) {
-                       hostname = sockaddr2hostname(&from);
+                       char *hostname = sockaddr2hostname(from);
                        logger(mesh, MESHLINK_WARNING, "Received UDP packet from unknown source %s", hostname);
                        free(hostname);
                        return;
@@ -584,5 +599,49 @@ void handle_incoming_vpn_data(event_loop_t *loop, void *data, int flags) {
 
        n->sock = ls - mesh->listen_socket;
 
-       receive_udppacket(mesh, n, &pkt);
+       receive_udppacket(mesh, n, pkt);
+}
+
+void handle_incoming_vpn_data(event_loop_t *loop, void *data, int flags) {
+       (void)flags;
+       meshlink_handle_t *mesh = loop->data;
+       listen_socket_t *ls = data;
+
+#ifdef HAVE_RECVMMSG
+       struct mmsgs *mmsgs = mesh->in_mmsgs;
+       int count = recvmmsg(ls->udp.fd, mmsgs->hdrs, MAX_MMSG, 0, NULL);
+
+       if(count <= 0 || count > MAX_MMSG) {
+               if(!sockwouldblock(sockerrno)) {
+                       logger(mesh, MESHLINK_ERROR, "Receiving packets failed: %s", sockstrerror(sockerrno));
+               }
+
+               return;
+       }
+
+       for(int i = 0; i < count; i++) {
+               mmsgs->pkts[i].len = mmsgs->hdrs[i].msg_len;
+               handle_one_incoming(mesh, ls, &mmsgs->pkts[i], &mmsgs->addrs[i]);
+               mmsgs->hdrs[i].msg_hdr.msg_namelen = sizeof(mmsgs->addrs[i]);
+       }
+
+#else
+       vpn_packet_t pkt;
+       sockaddr_t from;
+       socklen_t fromlen = sizeof(from);
+       memset(&from, 0, sizeof(from));
+
+       ssize_t len = recvfrom(ls->udp.fd, pkt.data, MAXSIZE, 0, &from.sa, &fromlen);
+
+       if(len <= 0 || len > MAXSIZE) {
+               if(!sockwouldblock(sockerrno)) {
+                       logger(mesh, MESHLINK_ERROR, "Receiving packet failed: %s", sockstrerror(sockerrno));
+               }
+
+               return;
+       }
+
+       pkt.len = len;
+       handle_one_incoming(mesh, ls, &pkt, &from);
+#endif
 }