]> git.meshlink.io Git - meshlink/blobdiff - src/net_packet.c
Add support for sendmmsg().
[meshlink] / src / net_packet.c
index d1cbf47d91397ed08f0e87a61131aefac8a99981..6a1d28a3392e0a8bd2e87793d1819684eb656ddb 100644 (file)
@@ -70,7 +70,7 @@ static void send_mtu_probe_handler(event_loop_t *loop, void *data) {
        if(n->mtuprobes > 32) {
                if(!n->minmtu) {
                        n->mtuprobes = 31;
-                       timeout = mesh->pinginterval;
+                       timeout = mesh->dev_class_traits[n->devclass].pinginterval;
                        goto end;
                }
 
@@ -79,6 +79,8 @@ static void send_mtu_probe_handler(event_loop_t *loop, void *data) {
                n->mtuprobes = 1;
                n->minmtu = 0;
                n->maxmtu = MTU;
+
+               update_node_pmtu(mesh, n);
        }
 
        if(n->mtuprobes >= 10 && n->mtuprobes < 32 && !n->minmtu) {
@@ -89,6 +91,7 @@ static void send_mtu_probe_handler(event_loop_t *loop, void *data) {
        if(n->mtuprobes == 30 || (n->mtuprobes < 30 && n->minmtu >= n->maxmtu)) {
                if(n->minmtu > n->maxmtu) {
                        n->minmtu = n->maxmtu;
+                       update_node_pmtu(mesh, n);
                } else {
                        n->maxmtu = n->minmtu;
                }
@@ -99,10 +102,16 @@ static void send_mtu_probe_handler(event_loop_t *loop, void *data) {
        }
 
        if(n->mtuprobes == 31) {
-               timeout = mesh->pinginterval;
+               if(!n->minmtu && n->status.want_udp) {
+                       /* Send a dummy ANS_KEY to try to update the reflexive UDP address */
+                       send_request(mesh, n->nexthop->connection, NULL, "%d %s %s . -1 -1 -1 0", ANS_KEY, mesh->self->name, n->name);
+                       n->status.want_udp = false;
+               }
+
+               timeout = mesh->dev_class_traits[n->devclass].pinginterval;
                goto end;
        } else if(n->mtuprobes == 32) {
-               timeout = mesh->pingtimeout;
+               timeout = mesh->dev_class_traits[n->devclass].pingtimeout;
        }
 
        for(int i = 0; i < 5; i++) {
@@ -117,7 +126,7 @@ static void send_mtu_probe_handler(event_loop_t *loop, void *data) {
                } else if(n->maxmtu <= n->minmtu) {
                        len = n->maxmtu;
                } else {
-                       len = n->minmtu + 1 + rand() % (n->maxmtu - n->minmtu);
+                       len = n->minmtu + 1 + prng(mesh, n->maxmtu - n->minmtu);
                }
 
                if(len < 64) {
@@ -139,19 +148,24 @@ static void send_mtu_probe_handler(event_loop_t *loop, void *data) {
        n->status.broadcast = false;
 
 end:
-       timeout_set(&mesh->loop, &n->mtutimeout, &(struct timeval) {
-               timeout, rand() % 100000
+       timeout_set(&mesh->loop, &n->mtutimeout, &(struct timespec) {
+               timeout, prng(mesh, TIMER_FUDGE)
        });
 }
 
 void send_mtu_probe(meshlink_handle_t *mesh, node_t *n) {
-       timeout_add(&mesh->loop, &n->mtutimeout, send_mtu_probe_handler, n, &(struct timeval) {
+       timeout_add(&mesh->loop, &n->mtutimeout, send_mtu_probe_handler, n, &(struct timespec) {
                1, 0
        });
        send_mtu_probe_handler(&mesh->loop, n);
 }
 
 static void mtu_probe_h(meshlink_handle_t *mesh, node_t *n, vpn_packet_t *packet, uint16_t len) {
+       if(len < 64) {
+               logger(mesh, MESHLINK_WARNING, "Got too short MTU probe length %d from %s", packet->len, n->name);
+               return;
+       }
+
        logger(mesh, MESHLINK_DEBUG, "Got MTU probe length %d from %s", packet->len, n->name);
 
        if(!packet->data[0]) {
@@ -171,7 +185,14 @@ static void mtu_probe_h(meshlink_handle_t *mesh, node_t *n, vpn_packet_t *packet
                   is possible using the address and socket that the reply
                   packet used. */
 
-               n->status.udp_confirmed = true;
+               if(!n->status.udp_confirmed) {
+                       char *address, *port;
+                       sockaddr2str(&n->address, &address, &port);
+                       send_request(mesh, n->nexthop->connection, NULL, "%d %s %s . -1 -1 -1 0 %s %s", ANS_KEY, n->name, n->name, address, port);
+                       free(address);
+                       free(port);
+                       n->status.udp_confirmed = true;
+               }
 
                /* If we haven't established the PMTU yet, restart the discovery process. */
 
@@ -198,12 +219,115 @@ static void mtu_probe_h(meshlink_handle_t *mesh, node_t *n, vpn_packet_t *packet
 
                if(n->minmtu < len) {
                        n->minmtu = len;
+                       update_node_pmtu(mesh, n);
                }
        }
 }
 
 /* VPN packet I/O */
 
+#if defined(HAVE_RECVMMSG) || defined(HAVE_SENDMMSG)
+#define MAX_MMSG 32
+
+struct mmsgs {
+       int count;
+       int offset;
+       int fd;
+       struct mmsghdr hdrs[MAX_MMSG];
+       struct iovec iovs[MAX_MMSG];
+       sockaddr_t addrs[MAX_MMSG];
+       vpn_packet_t pkts[MAX_MMSG];
+};
+
+static void init_mmsg_array(struct mmsgs *mmsgs) {
+       mmsgs->count = 0;
+       mmsgs->offset = 0;
+
+       for(int i = 0; i < MAX_MMSG; i++) {
+               mmsgs->hdrs[i].msg_hdr.msg_name = &mmsgs->addrs[i];
+               mmsgs->hdrs[i].msg_hdr.msg_namelen = sizeof(mmsgs->addrs[i]);
+               mmsgs->hdrs[i].msg_hdr.msg_iov = &mmsgs->iovs[i];
+               mmsgs->hdrs[i].msg_hdr.msg_iovlen = 1;
+               mmsgs->iovs[i].iov_base = mmsgs->pkts[i].data;
+               mmsgs->iovs[i].iov_len = MAXSIZE;
+       }
+}
+
+void init_mmsg(meshlink_handle_t *mesh) {
+       mesh->in_mmsgs = xzalloc(sizeof(*mesh->in_mmsgs));
+       mesh->out_mmsgs = xzalloc(sizeof(*mesh->out_mmsgs));
+
+       init_mmsg_array(mesh->in_mmsgs);
+       init_mmsg_array(mesh->out_mmsgs);
+}
+
+void exit_mmsg(meshlink_handle_t *mesh) {
+       free(mesh->out_mmsgs);
+       free(mesh->in_mmsgs);
+
+       mesh->out_mmsgs = NULL;
+       mesh->in_mmsgs = NULL;
+}
+#endif
+
+#ifdef HAVE_SENDMMSG
+void flush_mmsg(meshlink_handle_t *mesh) {
+       struct mmsgs *mmsgs = mesh->out_mmsgs;
+
+       int todo = mmsgs->count - mmsgs->offset;
+       int offset = mmsgs->offset;
+
+       while(todo) {
+               int result = sendmmsg(mmsgs->fd, mmsgs->hdrs + offset, todo, 0);
+
+               if(result <= 0) {
+                       logger(mesh, MESHLINK_WARNING, "Error sending packet: %s", sockstrerror(errno));
+                       break;
+               }
+
+               todo -= result;
+               offset += result;
+       }
+
+       mmsgs->count = 0;
+       mmsgs->offset = 0;
+}
+
+static vpn_packet_t *get_next_mmsg_pkt(meshlink_handle_t *mesh) {
+       struct mmsgs *mmsgs = mesh->out_mmsgs;
+
+       if(mmsgs->count >= MAX_MMSG) {
+               flush_mmsg(mesh);
+       }
+
+       return &mmsgs->pkts[mmsgs->count];
+}
+
+static void add_mmsg(meshlink_handle_t *mesh, int fd, const void *data, size_t len, const struct sockaddr *sa, socklen_t salen) {
+       struct mmsgs *mmsgs = mesh->out_mmsgs;
+       assert(mmsgs->count < MAX_MMSG);
+       assert(data == get_next_mmsg_pkt(mesh)->data);
+
+       if(mmsgs->fd != fd) {
+               // Flush all packets from the previous fd
+               int oldcount = mmsgs->count;
+               flush_mmsg(mesh);
+
+               // Adjust offset and count to start the next flush with this packet
+               mmsgs->fd = fd;
+               mmsgs->count = oldcount;
+               mmsgs->offset = oldcount;
+       }
+
+       assert(mmsgs->iovs[mmsgs->count].iov_base == mmsgs->pkts[mmsgs->count].data);
+       assert(mmsgs->hdrs[mmsgs->count].msg_hdr.msg_iovlen == 1);
+       mmsgs->iovs[mmsgs->count].iov_len = len;
+       memcpy(mmsgs->hdrs[mmsgs->count].msg_hdr.msg_name, sa, salen);
+       mmsgs->hdrs[mmsgs->count].msg_hdr.msg_namelen = salen;
+       mmsgs->count++;
+}
+#endif
+
 static void receive_packet(meshlink_handle_t *mesh, node_t *n, vpn_packet_t *packet) {
        logger(mesh, MESHLINK_DEBUG, "Received packet of %d bytes from %s", packet->len, n->name);
 
@@ -234,7 +358,9 @@ static void receive_udppacket(meshlink_handle_t *mesh, node_t *n, vpn_packet_t *
                return;
        }
 
-       sptps_receive_data(&n->sptps, inpkt->data, inpkt->len);
+       if(!sptps_receive_data(&n->sptps, inpkt->data, inpkt->len)) {
+               logger(mesh, MESHLINK_ERROR, "Could not process SPTPS data from %s: %s", n->name, strerror(errno));
+       }
 }
 
 static void send_sptps_packet(meshlink_handle_t *mesh, node_t *n, vpn_packet_t *origpkt) {
@@ -255,6 +381,10 @@ static void send_sptps_packet(meshlink_handle_t *mesh, node_t *n, vpn_packet_t *
 
        uint8_t type = 0;
 
+#ifdef HAVE_SENDMMSG
+       sptps_set_send_buffer(&n->sptps, get_next_mmsg_pkt(mesh)->data, MAXSIZE);
+#endif
+
        // If it's a probe, send it immediately without trying to compress it.
        if(origpkt->probe) {
                sptps_send_record(&n->sptps, PKT_PROBE, origpkt->data, origpkt->len);
@@ -284,11 +414,17 @@ static void choose_udp_address(meshlink_handle_t *mesh, const node_t *n, const s
                return;
        }
 
+       /* If we have learned an address via Catta, try this once every batch */
+       if(mesh->udp_choice == 1 && n->catta_address.sa.sa_family != AF_UNSPEC) {
+               *sa = &n->catta_address;
+               goto check_socket;
+       }
+
        /* Otherwise, address are found in edges to this node.
           So we pick a random edge and a random socket. */
 
        int i = 0;
-       int j = rand() % n->edge_tree->count;
+       int j = prng(mesh, n->edge_tree->count);
        edge_t *candidate = NULL;
 
        for splay_each(edge_t, e, n->edge_tree) {
@@ -300,9 +436,11 @@ static void choose_udp_address(meshlink_handle_t *mesh, const node_t *n, const s
 
        if(candidate) {
                *sa = &candidate->address;
-               *sock = rand() % mesh->listen_sockets;
+               *sock = prng(mesh, mesh->listen_sockets);
        }
 
+check_socket:
+
        /* Make sure we have a suitable socket for the chosen address */
        if(mesh->listen_socket[*sock].sa.sa.sa_family != (*sa)->sa.sa_family) {
                for(int i = 0; i < mesh->listen_sockets; i++) {
@@ -315,32 +453,16 @@ static void choose_udp_address(meshlink_handle_t *mesh, const node_t *n, const s
 }
 
 static void choose_broadcast_address(meshlink_handle_t *mesh, const node_t *n, const sockaddr_t **sa, int *sock) {
-       static sockaddr_t broadcast_ipv4 = {
-               .in = {
-                       .sin_family = AF_INET,
-                       .sin_addr.s_addr = -1,
-               }
-       };
-
-       static sockaddr_t broadcast_ipv6 = {
-               .in6 = {
-                       .sin6_family = AF_INET6,
-                       .sin6_addr.s6_addr[0x0] = 0xff,
-                       .sin6_addr.s6_addr[0x1] = 0x02,
-                       .sin6_addr.s6_addr[0xf] = 0x01,
-               }
-       };
+       *sock = prng(mesh, mesh->listen_sockets);
+       sockaddr_t *broadcast_sa = &mesh->listen_socket[*sock].broadcast_sa;
 
-       *sock = rand() % mesh->listen_sockets;
-
-       if(mesh->listen_socket[*sock].sa.sa.sa_family == AF_INET6) {
-               broadcast_ipv6.in6.sin6_port = n->prevedge->address.in.sin_port;
-               broadcast_ipv6.in6.sin6_scope_id = mesh->listen_socket[*sock].sa.in6.sin6_scope_id;
-               *sa = &broadcast_ipv6;
+       if(broadcast_sa->sa.sa_family == AF_INET6) {
+               broadcast_sa->in6.sin6_port = n->prevedge->address.in.sin_port;
        } else {
-               broadcast_ipv4.in.sin_port = n->prevedge->address.in.sin_port;
-               *sa = &broadcast_ipv4;
+               broadcast_sa->in.sin_port = n->prevedge->address.in.sin_port;
        }
+
+       *sa = broadcast_sa;
 }
 
 static void send_udppacket(meshlink_handle_t *mesh, node_t *n, vpn_packet_t *origpkt) {
@@ -353,12 +475,16 @@ static void send_udppacket(meshlink_handle_t *mesh, node_t *n, vpn_packet_t *ori
 }
 
 bool send_sptps_data(void *handle, uint8_t type, const void *data, size_t len) {
+       assert(handle);
+       assert(data);
+       assert(len);
+
        node_t *to = handle;
        meshlink_handle_t *mesh = to->mesh;
 
        /* Send it via TCP if it is a handshake packet, TCPOnly is in use, or this packet is larger than the MTU. */
 
-       if(type >= SPTPS_HANDSHAKE || (type != PKT_PROBE && len > to->minmtu)) {
+       if(type >= SPTPS_HANDSHAKE || (type != PKT_PROBE && (len - 21) > to->minmtu)) {
                char buf[len * 4 / 3 + 5];
                b64encode(data, buf, len);
 
@@ -382,6 +508,15 @@ bool send_sptps_data(void *handle, uint8_t type, const void *data, size_t len) {
                choose_udp_address(mesh, to, &sa, &sock);
        }
 
+#ifdef HAVE_SENDMMSG
+
+       if(type != PKT_PROBE) {
+               add_mmsg(mesh, mesh->listen_socket[sock].udp.fd, data, len, &sa->sa, SALEN(sa->sa));
+               return true;
+       }
+
+#endif
+
        if(sendto(mesh->listen_socket[sock].udp.fd, data, len, 0, &sa->sa, SALEN(sa->sa)) < 0 && !sockwouldblock(sockerrno)) {
                if(sockmsgsize(sockerrno)) {
                        if(to->maxmtu >= len) {
@@ -401,6 +536,9 @@ bool send_sptps_data(void *handle, uint8_t type, const void *data, size_t len) {
 }
 
 bool receive_sptps_record(void *handle, uint8_t type, const void *data, uint16_t len) {
+       assert(handle);
+       assert(!data || len);
+
        node_t *from = handle;
        meshlink_handle_t *mesh = from->mesh;
 
@@ -418,8 +556,8 @@ bool receive_sptps_record(void *handle, uint8_t type, const void *data, uint16_t
                return true;
        }
 
-       if(len > MTU) {
-               logger(mesh, MESHLINK_ERROR, "Packet from %s larger than maximum supported size (%d > %d)", from->name, len, MTU);
+       if(len > MAXSIZE) {
+               logger(mesh, MESHLINK_ERROR, "Packet from %s larger than maximum supported size (%d > %d)", from->name, len, MAXSIZE);
                return false;
        }
 
@@ -472,27 +610,12 @@ void send_packet(meshlink_handle_t *mesh, node_t *n, vpn_packet_t *packet) {
 
        n->out_packets++;
        n->out_bytes += packet->len;
+       n->status.want_udp = true;
 
        send_sptps_packet(mesh, n, packet);
        return;
 }
 
-/* Broadcast a packet using the minimum spanning tree */
-
-void broadcast_packet(meshlink_handle_t *mesh, const node_t *from, vpn_packet_t *packet) {
-       // Always give ourself a copy of the packet.
-       if(from != mesh->self) {
-               send_packet(mesh, mesh->self, packet);
-       }
-
-       logger(mesh, MESHLINK_INFO, "Broadcasting packet of %d bytes from %s", packet->len, from->name);
-
-       for list_each(connection_t, c, mesh->connections)
-               if(c->status.active && c->status.mst && c != from->nexthop->connection) {
-                       send_packet(mesh, c->node, packet);
-               }
-}
-
 static node_t *try_harder(meshlink_handle_t *mesh, const sockaddr_t *from, const vpn_packet_t *pkt) {
        node_t *n = NULL;
        bool hard = false;
@@ -525,42 +648,18 @@ static node_t *try_harder(meshlink_handle_t *mesh, const sockaddr_t *from, const
        return n;
 }
 
-void handle_incoming_vpn_data(event_loop_t *loop, void *data, int flags) {
-       (void)flags;
-       meshlink_handle_t *mesh = loop->data;
-       listen_socket_t *ls = data;
-       vpn_packet_t pkt;
-       char *hostname;
-       sockaddr_t from;
-       socklen_t fromlen = sizeof(from);
-       node_t *n;
-       int len;
-
-       memset(&from, 0, sizeof(from));
-
-       len = recvfrom(ls->udp.fd, pkt.data, MAXSIZE, 0, &from.sa, &fromlen);
+static void handle_one_incoming(meshlink_handle_t *mesh, listen_socket_t *ls, vpn_packet_t *pkt, sockaddr_t *from) {
+       sockaddrunmap(from); /* Some braindead IPv6 implementations do stupid things. */
 
-       if(len <= 0 || len > MAXSIZE) {
-               if(!sockwouldblock(sockerrno)) {
-                       logger(mesh, MESHLINK_ERROR, "Receiving packet failed: %s", sockstrerror(sockerrno));
-               }
-
-               return;
-       }
-
-       pkt.len = len;
-
-       sockaddrunmap(&from); /* Some braindead IPv6 implementations do stupid things. */
-
-       n = lookup_node_udp(mesh, &from);
+       node_t *n = lookup_node_udp(mesh, from);
 
        if(!n) {
-               n = try_harder(mesh, &from, &pkt);
+               n = try_harder(mesh, from, pkt);
 
                if(n) {
-                       update_node_udp(mesh, n, &from);
-               } else if(mesh->log_level >= MESHLINK_WARNING) {
-                       hostname = sockaddr2hostname(&from);
+                       update_node_udp(mesh, n, from);
+               } else if(mesh->log_level <= MESHLINK_WARNING) {
+                       char *hostname = sockaddr2hostname(from);
                        logger(mesh, MESHLINK_WARNING, "Received UDP packet from unknown source %s", hostname);
                        free(hostname);
                        return;
@@ -576,5 +675,49 @@ void handle_incoming_vpn_data(event_loop_t *loop, void *data, int flags) {
 
        n->sock = ls - mesh->listen_socket;
 
-       receive_udppacket(mesh, n, &pkt);
+       receive_udppacket(mesh, n, pkt);
+}
+
+void handle_incoming_vpn_data(event_loop_t *loop, void *data, int flags) {
+       (void)flags;
+       meshlink_handle_t *mesh = loop->data;
+       listen_socket_t *ls = data;
+
+#ifdef HAVE_RECVMMSG
+       struct mmsgs *mmsgs = mesh->in_mmsgs;
+       int count = recvmmsg(ls->udp.fd, mmsgs->hdrs, MAX_MMSG, 0, NULL);
+
+       if(count <= 0 || count > MAX_MMSG) {
+               if(!sockwouldblock(sockerrno)) {
+                       logger(mesh, MESHLINK_ERROR, "Receiving packets failed: %s", sockstrerror(sockerrno));
+               }
+
+               return;
+       }
+
+       for(int i = 0; i < count; i++) {
+               mmsgs->pkts[i].len = mmsgs->hdrs[i].msg_len;
+               handle_one_incoming(mesh, ls, &mmsgs->pkts[i], &mmsgs->addrs[i]);
+               mmsgs->hdrs[i].msg_hdr.msg_namelen = sizeof(mmsgs->addrs[i]);
+       }
+
+#else
+       vpn_packet_t pkt;
+       sockaddr_t from;
+       socklen_t fromlen = sizeof(from);
+       memset(&from, 0, sizeof(from));
+
+       ssize_t len = recvfrom(ls->udp.fd, pkt.data, MAXSIZE, 0, &from.sa, &fromlen);
+
+       if(len <= 0 || len > MAXSIZE) {
+               if(!sockwouldblock(sockerrno)) {
+                       logger(mesh, MESHLINK_ERROR, "Receiving packet failed: %s", sockstrerror(sockerrno));
+               }
+
+               return;
+       }
+
+       pkt.len = len;
+       handle_one_incoming(mesh, ls, &pkt, &from);
+#endif
 }