]> git.meshlink.io Git - meshlink/commitdiff
Implement MESHLINK_CHANNEL_FRAMED.
authorGuus Sliepen <guus@meshlink.io>
Sun, 24 May 2020 22:11:10 +0000 (00:11 +0200)
committerGuus Sliepen <guus@meshlink.io>
Sun, 24 May 2020 22:55:54 +0000 (00:55 +0200)
Both UDP and TCP style channels can now be set to use message framing.
Allowed message sizes are 0 to 65535 bytes.

For TCP, this means every meshlink_channel_send() will cause exactly one
receive callback on the other end, with the same size as the sent data.

For UDP style channels, this was already the normal behaviour (absent
packet loss), but with framing enabled UTCP will now concatenate multiple
messages in a single packet if possible.

src/meshlink++.h
src/meshlink.c
src/meshlink.h
src/meshlink.sym
src/utcp.c
src/utcp.h
src/utcp_priv.h
test/Makefile.am
test/channels-framed.c [new file with mode: 0644]
test/channels-udp-framed.c [new file with mode: 0644]
test/channels-udp.c

index bca39fd32332a2e3497e08a36f56c39056e49173..b27d9a8450debc55adb0549fdd7ffb64d7434b54 100644 (file)
@@ -823,6 +823,19 @@ public:
                meshlink_set_node_channel_timeout(handle, node, timeout);
        }
 
+       /// Set the flush timeout used for channels using message framing to the given node.
+       /** This sets the timeout after which UDP-style channels that use message framing will
+           flush partial packets.
+           *  The timeout is set for all current and future channels to the given node.
+           *
+           *  @param node         The node to set the flush timeout for.
+           *  @param timeout      The timeout in milliseconds after which partial packets will be flushed.
+           *                      The default is 5 milliseconds.
+           */
+       void set_node_flush_timeout(node *node, int timeout) {
+               meshlink_set_node_flush_timeout(handle, node, timeout);
+       }
+
        /// Open a reliable stream channel to another node.
        /** This function is called whenever a remote node wants to open a channel to the local node.
         *  The application then has to decide whether to accept or reject this channel.
index 22f5220d4beebe6c19d6612d45923602e0e78061..89680ef79a9cedc3e1f5be062050d6b47f8bfd6c 100644 (file)
@@ -3906,11 +3906,7 @@ ssize_t meshlink_channel_send(meshlink_handle_t *mesh, meshlink_channel_t *chann
                return -1;
        }
 
-       if(!len) {
-               return 0;
-       }
-
-       if(!data) {
+       if(len && !data) {
                meshlink_errno = MESHLINK_EINVAL;
                return -1;
        }
@@ -4145,6 +4141,27 @@ void meshlink_set_node_channel_timeout(meshlink_handle_t *mesh, meshlink_node_t
        pthread_mutex_unlock(&mesh->mutex);
 }
 
+void meshlink_set_node_flush_timeout(meshlink_handle_t *mesh, meshlink_node_t *node, int timeout) {
+       if(!mesh || !node) {
+               meshlink_errno = MESHLINK_EINVAL;
+               return;
+       }
+
+       node_t *n = (node_t *)node;
+
+       pthread_mutex_lock(&mesh->mutex);
+
+       if(!n->utcp) {
+               n->utcp = utcp_init(channel_accept, channel_pre_accept, channel_send, n);
+               utcp_set_mtu(n->utcp, n->mtu - sizeof(meshlink_packethdr_t));
+               utcp_set_retransmit_cb(n->utcp, channel_retransmit);
+       }
+
+       utcp_set_flush_timeout(n->utcp, timeout);
+
+       pthread_mutex_unlock(&mesh->mutex);
+}
+
 void update_node_status(meshlink_handle_t *mesh, node_t *n) {
        if(n->status.reachable && mesh->channel_accept_cb && !n->utcp) {
                n->utcp = utcp_init(channel_accept, channel_pre_accept, channel_send, n);
index f504497562cf723c3e8d4f2a28315b84ea840e7e..cece7f8df2834335a6ca079d63418390964c4100 100644 (file)
@@ -1530,6 +1530,19 @@ size_t meshlink_channel_get_mss(struct meshlink_handle *mesh, struct meshlink_ch
  */
 void meshlink_set_node_channel_timeout(struct meshlink_handle *mesh, struct meshlink_node *node, int timeout);
 
+/// Set the flush timeout used for channels using message framing to the given node.
+/** This sets the timeout after which UDP-style channels that use message framing will
+    flush partial packets.
+ *  The timeout is set for all current and future channels to the given node.
+ *
+ *  \memberof meshlink_node
+ *  @param mesh         A handle which represents an instance of MeshLink.
+ *  @param node         A pointer to a struct meshlink_node describing the node to set the channel connection timeout for.
+ *  @param timeout      The timeout in milliseconds after which partial packets will be flushed.
+ *                      The default is 5 milliseconds.
+ */
+void meshlink_set_node_flush_timeout(struct meshlink_handle *mesh, struct meshlink_node *node, int timeout);
+
 /// Hint that a hostname may be found at an address
 /** This function indicates to meshlink that the given hostname is likely found
  *  at the given IP address and port.
index 16a659ee1e8198ecc375478f410be779bea6d96e..24de6b42630ca817969caf09265dab2d0d285c33 100644 (file)
@@ -79,6 +79,7 @@ meshlink_set_inviter_commits_first
 meshlink_set_log_cb
 meshlink_set_node_channel_timeout
 meshlink_set_node_duplicate_cb
+meshlink_set_node_flush_timeout
 meshlink_set_node_pmtu_cb
 meshlink_set_node_status_cb
 meshlink_set_port
index c4305cd4de4978d265155f2dcb537cd746e39251..2a1ac5b86f370ec319b833445a24c918f3bc4402 100644 (file)
@@ -195,6 +195,10 @@ static bool is_reliable(struct utcp_connection *c) {
        return c->flags & UTCP_RELIABLE;
 }
 
+static bool is_framed(struct utcp_connection *c) {
+       return c->flags & UTCP_FRAMED;
+}
+
 static int32_t seqdiff(uint32_t a, uint32_t b) {
        return a - b;
 }
@@ -599,6 +603,19 @@ static void start_retransmit_timer(struct utcp_connection *c) {
        debug(c, "rtrx_timeout %ld.%06lu\n", c->rtrx_timeout.tv_sec, c->rtrx_timeout.tv_nsec);
 }
 
+static void start_flush_timer(struct utcp_connection *c) {
+       clock_gettime(UTCP_CLOCK, &c->rtrx_timeout);
+
+       c->rtrx_timeout.tv_nsec += c->utcp->flush_timeout * 1000000;
+
+       if(c->rtrx_timeout.tv_nsec >= NSEC_PER_SEC) {
+               c->rtrx_timeout.tv_nsec -= NSEC_PER_SEC;
+               c->rtrx_timeout.tv_sec++;
+       }
+
+       debug(c, "rtrx_timeout %ld.%06lu (flush)\n", c->rtrx_timeout.tv_sec, c->rtrx_timeout.tv_nsec);
+}
+
 static void stop_retransmit_timer(struct utcp_connection *c) {
        timespec_clear(&c->rtrx_timeout);
        debug(c, "rtrx_timeout cleared\n");
@@ -735,108 +752,263 @@ static void ack(struct utcp_connection *c, bool sendatleastone) {
        } while(left);
 }
 
-ssize_t utcp_send(struct utcp_connection *c, const void *data, size_t len) {
-       if(c->reapable) {
-               debug(c, "send() called on closed connection\n");
-               errno = EBADF;
-               return -1;
+static ssize_t utcp_send_reliable(struct utcp_connection *c, const void *data, size_t len) {
+       size_t rlen = len + (is_framed(c) ? 2 : 0);
+
+       if(!rlen) {
+               return 0;
        }
 
-       switch(c->state) {
-       case CLOSED:
-       case LISTEN:
-               debug(c, "send() called on unconnected connection\n");
-               errno = ENOTCONN;
-               return -1;
+       // Check if we need to be able to buffer all data
 
-       case SYN_SENT:
-       case SYN_RECEIVED:
-       case ESTABLISHED:
-       case CLOSE_WAIT:
-               break;
+       if(c->flags & (UTCP_NO_PARTIAL | UTCP_FRAMED)) {
+               if(rlen > c->sndbuf.maxsize) {
+                       errno = EMSGSIZE;
+                       return -1;
+               }
 
-       case FIN_WAIT_1:
-       case FIN_WAIT_2:
-       case CLOSING:
-       case LAST_ACK:
-       case TIME_WAIT:
-               debug(c, "send() called on closed connection\n");
-               errno = EPIPE;
-               return -1;
+               if((c->flags & UTCP_FRAMED) && len > MAX_UNRELIABLE_SIZE) {
+                       errno = EMSGSIZE;
+                       return -1;
+               }
+
+               if(rlen > buffer_free(&c->sndbuf)) {
+                       errno = EWOULDBLOCK;
+                       return 0;
+               }
        }
 
-       // Exit early if we have nothing to send.
+       // Add data to the send buffer.
 
-       if(!len) {
-               return 0;
+       if(is_framed(c)) {
+               uint16_t len16 = len;
+               buffer_put(&c->sndbuf, &len16, sizeof(len16));
+               assert(buffer_put(&c->sndbuf, data, len) == (ssize_t)len);
+       } else {
+               len = buffer_put(&c->sndbuf, data, len);
+
+               if(len <= 0) {
+                       errno = EWOULDBLOCK;
+                       return 0;
+               }
        }
 
-       if(!data) {
-               errno = EFAULT;
-               return -1;
+       c->snd.last += rlen;
+
+       // Don't send anything yet if the connection has not fully established yet
+
+       if(c->state == SYN_SENT || c->state == SYN_RECEIVED) {
+               return len;
        }
 
-       // Check if we need to be able to buffer all data
+       ack(c, false);
 
-       if(c->flags & UTCP_NO_PARTIAL) {
-               if(len > buffer_free(&c->sndbuf)) {
-                       if(len > c->sndbuf.maxsize) {
-                               errno = EMSGSIZE;
-                               return -1;
-                       } else {
-                               errno = EWOULDBLOCK;
-                               return 0;
-                       }
-               }
+       if(!timespec_isset(&c->rtrx_timeout)) {
+               start_retransmit_timer(c);
        }
 
-       // Add data to send buffer.
+       if(!timespec_isset(&c->conn_timeout)) {
+               clock_gettime(UTCP_CLOCK, &c->conn_timeout);
+               c->conn_timeout.tv_sec += c->utcp->timeout;
+       }
 
-       if(is_reliable(c)) {
-               len = buffer_put(&c->sndbuf, data, len);
-       } else if(c->state != SYN_SENT && c->state != SYN_RECEIVED) {
-               if(len > MAX_UNRELIABLE_SIZE || buffer_put(&c->sndbuf, data, len) != (ssize_t)len) {
-                       errno = EMSGSIZE;
-                       return -1;
+       return len;
+}
+
+
+/* In the send buffer we can have multiple frames, each prefixed with their
+   length. However, the first frame might already have been partially sent. The
+   variable c->frame_offset tracks how much of a partial frame is left at the
+   start. If it is 0, it means there is no partial frame, and the first two
+   bytes in the send buffer are the length of the first frame.
+
+   After sending an MSS sized packet, we need to calculate the new frame_offset
+   value, since it is likely that the next packet will also have a partial frame
+   at the start. We do this by scanning the previously sent packet for frame
+   headers, to find out how many bytes of the last frame are left to send.
+*/
+static void ack_unreliable_framed(struct utcp_connection *c) {
+       int32_t left = seqdiff(c->snd.last, c->snd.nxt);
+       assert(left > 0);
+
+       struct {
+               struct hdr hdr;
+               uint8_t data[];
+       } *pkt = c->utcp->pkt;
+
+       pkt->hdr.src = c->src;
+       pkt->hdr.dst = c->dst;
+       pkt->hdr.ack = c->rcv.nxt;
+       pkt->hdr.ctl = ACK | MF;
+       pkt->hdr.aux = 0;
+
+       bool sent_packet = false;
+
+       while(left >= c->utcp->mss) {
+               pkt->hdr.wnd = c->frame_offset;
+               uint32_t seglen = c->utcp->mss;
+
+               pkt->hdr.seq = c->snd.nxt;
+
+               buffer_copy(&c->sndbuf, pkt->data, seqdiff(c->snd.nxt, c->snd.una), seglen);
+
+               c->snd.nxt += seglen;
+               c->snd.una = c->snd.nxt;
+               left -= seglen;
+
+               print_packet(c, "send", pkt, sizeof(pkt->hdr) + seglen);
+               c->utcp->send(c->utcp, pkt, sizeof(pkt->hdr) + seglen);
+               sent_packet = true;
+
+               // Calculate the new frame offset
+               while(c->frame_offset < seglen) {
+                       uint16_t framelen;
+                       buffer_copy(&c->sndbuf, &framelen, c->frame_offset, sizeof(framelen));
+                       c->frame_offset += framelen + 2;
                }
-       } else {
-               return 0;
+
+               buffer_discard(&c->sndbuf, seglen);
+               c->frame_offset -= seglen;
+       };
+
+       if(sent_packet) {
+               if(left) {
+                       // We sent one packet but we have partial data left, (re)start the flush timer
+                       start_flush_timer(c);
+               } else {
+                       // There is no partial data in the send buffer, so stop the flush timer
+                       stop_retransmit_timer(c);
+               }
+       }
+}
+
+static void flush_unreliable_framed(struct utcp_connection *c) {
+       int32_t left = seqdiff(c->snd.last, c->snd.nxt);
+
+       /* If the MSS dropped since last time ack_unreliable_frame() was called,
+         we might now have more than one segment worth of data left.
+       */
+       if(left > c->utcp->mss) {
+               ack_unreliable_framed(c);
+               left = seqdiff(c->snd.last, c->snd.nxt);
+               assert(left <= c->utcp->mss);
        }
 
-       if(len <= 0) {
-               if(is_reliable(c)) {
+       if(left) {
+               struct {
+                       struct hdr hdr;
+                       uint8_t data[];
+               } *pkt = c->utcp->pkt;
+
+               pkt->hdr.src = c->src;
+               pkt->hdr.dst = c->dst;
+               pkt->hdr.seq = c->snd.nxt;
+               pkt->hdr.ack = c->rcv.nxt;
+               pkt->hdr.wnd = c->frame_offset;
+               pkt->hdr.ctl = ACK | MF;
+               pkt->hdr.aux = 0;
+
+               uint32_t seglen = left;
+
+               buffer_copy(&c->sndbuf, pkt->data, seqdiff(c->snd.nxt, c->snd.una), seglen);
+               buffer_discard(&c->sndbuf, seglen);
+
+               c->snd.nxt += seglen;
+               c->snd.una = c->snd.nxt;
+
+               print_packet(c, "send", pkt, sizeof(pkt->hdr) + seglen);
+               c->utcp->send(c->utcp, pkt, sizeof(pkt->hdr) + seglen);
+       }
+
+       c->frame_offset = 0;
+       stop_retransmit_timer(c);
+}
+
+
+static ssize_t utcp_send_unreliable(struct utcp_connection *c, const void *data, size_t len) {
+       if(len > MAX_UNRELIABLE_SIZE) {
+               errno = EMSGSIZE;
+               return -1;
+       }
+
+       size_t rlen = len + (is_framed(c) ? 2 : 0);
+
+       if(rlen > buffer_free(&c->sndbuf)) {
+               if(rlen > c->sndbuf.maxsize) {
+                       errno = EMSGSIZE;
+                       return -1;
+               } else {
                        errno = EWOULDBLOCK;
                        return 0;
-               } else {
-                       return len;
                }
        }
 
-       c->snd.last += len;
-
        // Don't send anything yet if the connection has not fully established yet
 
        if(c->state == SYN_SENT || c->state == SYN_RECEIVED) {
                return len;
        }
 
-       ack(c, false);
+       if(is_framed(c)) {
+               uint16_t framelen = len;
+               buffer_put(&c->sndbuf, &framelen, sizeof(framelen));
+       }
 
-       if(!is_reliable(c)) {
+       buffer_put(&c->sndbuf, data, len);
+
+       c->snd.last += rlen;
+
+       if(is_framed(c)) {
+               ack_unreliable_framed(c);
+       } else {
+               ack(c, false);
                c->snd.una = c->snd.nxt = c->snd.last;
                buffer_discard(&c->sndbuf, c->sndbuf.used);
        }
 
-       if(is_reliable(c) && !timespec_isset(&c->rtrx_timeout)) {
-               start_retransmit_timer(c);
+       return len;
+}
+
+ssize_t utcp_send(struct utcp_connection *c, const void *data, size_t len) {
+       if(c->reapable) {
+               debug(c, "send() called on closed connection\n");
+               errno = EBADF;
+               return -1;
        }
 
-       if(is_reliable(c) && !timespec_isset(&c->conn_timeout)) {
-               clock_gettime(UTCP_CLOCK, &c->conn_timeout);
-               c->conn_timeout.tv_sec += c->utcp->timeout;
+       switch(c->state) {
+       case CLOSED:
+       case LISTEN:
+               debug(c, "send() called on unconnected connection\n");
+               errno = ENOTCONN;
+               return -1;
+
+       case SYN_SENT:
+       case SYN_RECEIVED:
+       case ESTABLISHED:
+       case CLOSE_WAIT:
+               break;
+
+       case FIN_WAIT_1:
+       case FIN_WAIT_2:
+       case CLOSING:
+       case LAST_ACK:
+       case TIME_WAIT:
+               debug(c, "send() called on closed connection\n");
+               errno = EPIPE;
+               return -1;
        }
 
-       return len;
+       if(!data && len) {
+               errno = EFAULT;
+               return -1;
+       }
+
+       if(is_reliable(c)) {
+               return utcp_send_reliable(c, data, len);
+       } else {
+               return utcp_send_unreliable(c, data, len);
+       }
 }
 
 static void swap_ports(struct hdr *hdr) {
@@ -899,7 +1071,7 @@ static void retransmit(struct utcp_connection *c) {
 
        struct utcp *utcp = c->utcp;
 
-       if(utcp->retransmit) {
+       if(utcp->retransmit && is_reliable(c)) {
                utcp->retransmit(c);
        }
 
@@ -942,6 +1114,11 @@ static void retransmit(struct utcp_connection *c) {
        case CLOSE_WAIT:
        case CLOSING:
        case LAST_ACK:
+               if(!is_reliable(c) && is_framed(c) && c->sndbuf.used) {
+                       flush_unreliable_framed(c);
+                       return;
+               }
+
                // Send unacked data again.
                pkt->hdr.seq = c->snd.una;
                pkt->hdr.ack = c->rcv.nxt;
@@ -1100,6 +1277,13 @@ static void handle_out_of_order(struct utcp_connection *c, uint32_t offset, cons
        }
 }
 
+static void handle_out_of_order_framed(struct utcp_connection *c, uint32_t offset, const void *data, size_t len) {
+       uint32_t in_order_offset = (c->sacks[0].len && c->sacks[0].offset == 0) ? c->sacks[0].len : 0;
+
+       // Put the data into the receive buffer
+       handle_out_of_order(c, offset + in_order_offset, data, len);
+}
+
 static void handle_in_order(struct utcp_connection *c, const void *data, size_t len) {
        if(c->recv) {
                ssize_t rxd = c->recv(c, data, len);
@@ -1135,6 +1319,50 @@ static void handle_in_order(struct utcp_connection *c, const void *data, size_t
        c->rcv.nxt += len;
 }
 
+static void handle_in_order_framed(struct utcp_connection *c, const void *data, size_t len) {
+       // Treat it as out of order, since it is unlikely the start of this packet contains the start of a frame.
+       uint32_t in_order_offset = (c->sacks[0].len && c->sacks[0].offset == 0) ? c->sacks[0].len : 0;
+       handle_out_of_order(c, in_order_offset, data, len);
+
+       // While we have full frames at the start, give them to the application
+       while(c->sacks[0].len >= 2 && c->sacks[0].offset == 0) {
+               uint16_t framelen;
+               buffer_copy(&c->rcvbuf, &framelen, 0, sizeof(&framelen));
+
+               if(framelen > c->sacks[0].len - 2) {
+                       break;
+               }
+
+               if(c->recv) {
+                       ssize_t rxd;
+                       uint32_t realoffset = c->rcvbuf.offset + 2;
+
+                       if(c->rcvbuf.size - c->rcvbuf.offset <= 2) {
+                               realoffset -= c->rcvbuf.size;
+                       }
+
+                       if(realoffset > c->rcvbuf.size - framelen) {
+                               // The buffer wraps, we need to copy
+                               uint8_t buf[framelen];
+                               buffer_copy(&c->rcvbuf, buf, 2, framelen);
+                               rxd = c->recv(c, buf, framelen);
+                       } else {
+                               // The frame is contiguous in the receive buffer
+                               rxd = c->recv(c, c->rcvbuf.data + realoffset, framelen);
+                       }
+
+                       if(rxd != (ssize_t)framelen) {
+                               // TODO: handle the application not accepting all data.
+                               abort();
+                       }
+               }
+
+               sack_consume(c, framelen + 2);
+       }
+
+       c->rcv.nxt += len;
+}
+
 static void handle_unreliable(struct utcp_connection *c, const struct hdr *hdr, const void *data, size_t len) {
        // Fast path for unfragmented packets
        if(!hdr->wnd && !(hdr->ctl & MF)) {
@@ -1175,18 +1403,87 @@ static void handle_unreliable(struct utcp_connection *c, const struct hdr *hdr,
        c->rcv.nxt = hdr->seq + len;
 }
 
+static void handle_unreliable_framed(struct utcp_connection *c, const struct hdr *hdr, const void *data, size_t len) {
+       bool in_order = hdr->seq == c->rcv.nxt;
+       c->rcv.nxt = hdr->seq + len;
+
+       const uint8_t *ptr = data;
+       size_t left = len;
+
+       // Does it start with a partial frame?
+       if(hdr->wnd) {
+               // Only accept the data if it is in order
+               if(in_order && c->rcvbuf.used) {
+                       // In order, append it to the receive buffer
+                       buffer_put(&c->rcvbuf, data, min(hdr->wnd, len));
+
+                       if(hdr->wnd <= len) {
+                               // We have a full frame
+                               c->recv(c, (uint8_t *)c->rcvbuf.data + 2, c->rcvbuf.used - 2);
+                       }
+               }
+
+               // Exit early if there is other data in this frame
+               if(hdr->wnd > len) {
+                       if(!in_order) {
+                               buffer_clear(&c->rcvbuf);
+                       }
+
+                       return;
+               }
+
+               ptr += hdr->wnd;
+               left -= hdr->wnd;
+       }
+
+       // We now start with new frames, so clear any data in the receive buffer
+       buffer_clear(&c->rcvbuf);
+
+       // Handle whole frames
+       while(left > 2) {
+               uint16_t framelen;
+               memcpy(&framelen, ptr, sizeof(framelen));
+
+               if(left <= (size_t)framelen + 2) {
+                       break;
+               }
+
+               c->recv(c, ptr + 2, framelen);
+               ptr += framelen + 2;
+               left -= framelen + 2;
+       }
+
+       // Handle partial last frame
+       if(left) {
+               buffer_put(&c->rcvbuf, ptr, left);
+       }
+}
+
 static void handle_incoming_data(struct utcp_connection *c, const struct hdr *hdr, const void *data, size_t len) {
        if(!is_reliable(c)) {
-               handle_unreliable(c, hdr, data, len);
+               if(is_framed(c)) {
+                       handle_unreliable_framed(c, hdr, data, len);
+               } else {
+                       handle_unreliable(c, hdr, data, len);
+               }
+
                return;
        }
 
        uint32_t offset = seqdiff(hdr->seq, c->rcv.nxt);
 
-       if(offset) {
-               handle_out_of_order(c, offset, data, len);
+       if(is_framed(c)) {
+               if(offset) {
+                       handle_out_of_order_framed(c, offset, data, len);
+               } else {
+                       handle_in_order_framed(c, data, len);
+               }
        } else {
-               handle_in_order(c, data, len);
+               if(offset) {
+                       handle_out_of_order(c, offset, data, len);
+               } else {
+                       handle_in_order(c, data, len);
+               }
        }
 }
 
@@ -1447,7 +1744,7 @@ synack:
                int32_t rcv_offset = seqdiff(hdr.seq, c->rcv.nxt);
 
                if(rcv_offset) {
-                       debug(c, "packet out of order, offset %u bytes", rcv_offset);
+                       debug(c, "packet out of order, offset %u bytes\n", rcv_offset);
                }
 
 #endif
@@ -1938,6 +2235,10 @@ int utcp_shutdown(struct utcp_connection *c, int dir) {
 
        case SYN_RECEIVED:
        case ESTABLISHED:
+               if(!is_reliable(c) && is_framed(c)) {
+                       flush_unreliable_framed(c);
+               }
+
                set_state(c, FIN_WAIT_1);
                break;
 
@@ -1957,7 +2258,7 @@ int utcp_shutdown(struct utcp_connection *c, int dir) {
 
        c->snd.last++;
 
-       ack(c, false);
+       ack(c, !is_reliable(c));
 
        if(!timespec_isset(&c->rtrx_timeout)) {
                start_retransmit_timer(c);
@@ -2052,8 +2353,10 @@ void utcp_abort_all_connections(struct utcp *utcp) {
 }
 
 int utcp_close(struct utcp_connection *c) {
+       debug(c, "closing\n");
+
        if(c->rcvbuf.used) {
-               fprintf(stderr, "UTCP channel closed with stuff in receive buffer\n");
+               debug(c, "receive buffer not empty, resetting\n");
                return reset_connection(c) ? 0 : -1;
        }
 
@@ -2474,3 +2777,11 @@ void utcp_set_retransmit_cb(struct utcp *utcp, utcp_retransmit_t cb) {
 void utcp_set_clock_granularity(long granularity) {
        CLOCK_GRANULARITY = granularity;
 }
+
+int utcp_get_flush_timeout(struct utcp *utcp) {
+       return utcp->flush_timeout;
+}
+
+void utcp_set_flush_timeout(struct utcp *utcp, int milliseconds) {
+       utcp->flush_timeout = milliseconds;
+}
index c051ff1ff460ea977573a001d78ff0ef7c16e73c..60ecdd34d24e12418b8910ef0cd9faa62e6e7aec 100644 (file)
@@ -86,6 +86,9 @@ void utcp_abort_all_connections(struct utcp *utcp);
 int utcp_get_user_timeout(struct utcp *utcp);
 void utcp_set_user_timeout(struct utcp *utcp, int seconds);
 
+int utcp_get_flush_timeout(struct utcp *utcp);
+void utcp_set_flush_timeout(struct utcp *utcp, int milliseconds);
+
 uint16_t utcp_get_mtu(struct utcp *utcp);
 uint16_t utcp_get_mss(struct utcp *utcp);
 void utcp_set_mtu(struct utcp *utcp, uint16_t mtu);
index 7ef147780f1b9a8ade6a2831aed70ae595c13f4b..8713d2d4e0554017b270338b406e502398a478e8 100644 (file)
@@ -137,6 +137,7 @@ struct utcp_connection {
                uint32_t irs;
        } rcv;
 
+       uint32_t frame_offset;
        int dupack;
 
        // Timers
@@ -190,6 +191,7 @@ struct utcp {
        uint16_t mtu; // The maximum size of a UTCP packet, including headers.
        uint16_t mss; // The maximum size of the payload of a UTCP packet.
        int timeout; // sec
+       int flush_timeout; // milliseconds
 
        // Connection management
 
index 28f92560071fbe800eeb29fc61fec23d9b85b3b6..9df5ca2fd3f1aa651eaf8259c282c71a9315023c 100644 (file)
@@ -9,8 +9,10 @@ TESTS = \
        channels-cornercases \
        channels-failure \
        channels-fork \
+       channels-framed \
        channels-no-partial \
        channels-udp \
+       channels-udp-framed \
        duplicate \
        encrypted \
        ephemeral \
@@ -43,8 +45,10 @@ check_PROGRAMS = \
        channels-cornercases \
        channels-failure \
        channels-fork \
+       channels-framed \
        channels-no-partial \
        channels-udp \
+       channels-udp-framed \
        duplicate \
        echo-fork \
        encrypted \
@@ -73,6 +77,9 @@ blacklist_LDADD = $(top_builddir)/src/libmeshlink.la
 channels_SOURCES = channels.c utils.c utils.h
 channels_LDADD = $(top_builddir)/src/libmeshlink.la
 
+channels_framed_SOURCES = channels-framed.c utils.c utils.h
+channels_framed_LDADD = $(top_builddir)/src/libmeshlink.la
+
 channels_aio_SOURCES = channels-aio.c utils.c utils.h
 channels_aio_LDADD = $(top_builddir)/src/libmeshlink.la
 
@@ -97,6 +104,9 @@ channels_cornercases_LDADD = $(top_builddir)/src/libmeshlink.la
 channels_udp_SOURCES = channels-udp.c utils.c utils.h
 channels_udp_LDADD = $(top_builddir)/src/libmeshlink.la
 
+channels_udp_framed_SOURCES = channels-udp-framed.c utils.c utils.h
+channels_udp_framed_LDADD = $(top_builddir)/src/libmeshlink.la
+
 duplicate_SOURCES = duplicate.c utils.c utils.h
 duplicate_LDADD = $(top_builddir)/src/libmeshlink.la
 
diff --git a/test/channels-framed.c b/test/channels-framed.c
new file mode 100644 (file)
index 0000000..b6d9928
--- /dev/null
@@ -0,0 +1,126 @@
+#ifdef NDEBUG
+#undef NDEBUG
+#endif
+
+#include <assert.h>
+#include <stdio.h>
+#include <unistd.h>
+#include <stdlib.h>
+#include <string.h>
+#include <sys/time.h>
+#include <time.h>
+
+#include "../src/meshlink.h"
+#include "utils.h"
+
+static bool received_large;
+static bool received_zero;
+static size_t received;
+static struct sync_flag accept_flag;
+static struct sync_flag close_flag;
+
+static void receive_cb(meshlink_handle_t *mesh, meshlink_channel_t *channel, const void *data, size_t len) {
+       (void)mesh;
+
+       if(!data && !len) {
+               meshlink_channel_close(mesh, channel);
+               set_sync_flag(&close_flag, true);
+       }
+
+       if(len >= 2) {
+               uint16_t checklen;
+               memcpy(&checklen, data, sizeof(checklen));
+               assert(len == checklen);
+       }
+
+       if(len == 65535) {
+               received_large = true;
+       }
+
+       if(len == 0) {
+               received_zero = true;
+       }
+
+       received += len;
+}
+
+static bool accept_cb(meshlink_handle_t *mesh, meshlink_channel_t *channel, uint16_t port, const void *data, size_t len) {
+       assert(port == 1);
+       assert(!data);
+       assert(!len);
+       assert(meshlink_channel_get_flags(mesh, channel) == (MESHLINK_CHANNEL_TCP | MESHLINK_CHANNEL_FRAMED));
+       meshlink_set_channel_receive_cb(mesh, channel, receive_cb);
+       set_sync_flag(&accept_flag, true);
+
+       return true;
+}
+
+int main(void) {
+       meshlink_set_log_cb(NULL, MESHLINK_WARNING, log_cb);
+
+       // Open two meshlink instances
+
+       meshlink_handle_t *mesh_a, *mesh_b;
+       open_meshlink_pair(&mesh_a, &mesh_b, "channels_framed");
+       start_meshlink_pair(mesh_a, mesh_b);
+
+       // Create a channel from a to b
+
+       meshlink_set_channel_accept_cb(mesh_b, accept_cb);
+
+       meshlink_node_t *b = meshlink_get_node(mesh_a, "b");
+       assert(b);
+
+       meshlink_channel_t *channel = meshlink_channel_open_ex(mesh_a, b, 1, NULL, NULL, 0, MESHLINK_CHANNEL_TCP | MESHLINK_CHANNEL_FRAMED);
+       assert(channel);
+
+       size_t sndbuf_size = 128 * 1024;
+       meshlink_set_channel_sndbuf(mesh_a, channel, sndbuf_size);
+
+       // Check that we cannot send more than 65535 bytes without errors
+
+       char data[65535] = "";
+       assert(meshlink_channel_send(mesh_a, channel, data, 65536) == -1);
+
+       // Check that we can send 65535 bytes
+
+       uint16_t framelen = 65535;
+       memcpy(data, &framelen, sizeof(framelen));
+       assert(meshlink_channel_send(mesh_a, channel, data, framelen) == framelen);
+
+       // Check that we can send zero bytes
+
+       assert(meshlink_channel_send(mesh_a, channel, data, 0) == 0);
+
+       // Send randomly sized frames from a to b
+
+       size_t total_len = framelen;
+
+       for(int j = 0; j < 2500; j++) {
+               framelen = rand() % 2048;
+               memcpy(data, &framelen, sizeof(framelen));
+
+               while(meshlink_channel_get_sendq(mesh_a, channel) > sndbuf_size - framelen - 2) {
+                       const struct timespec req = {0, 2000000};
+                       clock_nanosleep(CLOCK_MONOTONIC, 0, &req, NULL);
+               }
+
+               assert(meshlink_channel_send(mesh_a, channel, data, framelen) == framelen);
+               total_len += framelen;
+       }
+
+       // Closes the channel and wait for the other end to closes it as well
+
+       meshlink_channel_close(mesh_a, channel);
+       assert(wait_sync_flag(&close_flag, 10));
+
+       // Check that the clients have received all the data we sent
+
+       assert(received == total_len);
+       assert(received_large);
+       assert(received_zero);
+
+       close_meshlink_pair(mesh_a, mesh_b);
+
+       return 0;
+}
diff --git a/test/channels-udp-framed.c b/test/channels-udp-framed.c
new file mode 100644 (file)
index 0000000..9fb2697
--- /dev/null
@@ -0,0 +1,122 @@
+#ifdef NDEBUG
+#undef NDEBUG
+#endif
+
+#include <assert.h>
+#include <stdio.h>
+#include <unistd.h>
+#include <stdlib.h>
+#include <string.h>
+#include <sys/time.h>
+#include <time.h>
+
+#include "../src/meshlink.h"
+#include "utils.h"
+
+static bool received_large;
+static bool received_zero;
+static size_t received;
+static struct sync_flag accept_flag;
+static struct sync_flag close_flag;
+
+static void receive_cb(meshlink_handle_t *mesh, meshlink_channel_t *channel, const void *data, size_t len) {
+       (void)mesh;
+
+       if(!data && !len) {
+               meshlink_channel_close(mesh, channel);
+               set_sync_flag(&close_flag, true);
+       }
+
+       if(len >= 2) {
+               uint16_t checklen;
+               memcpy(&checklen, data, sizeof(checklen));
+               assert(len == checklen);
+       }
+
+       if(len == 65535) {
+               received_large = true;
+       }
+
+       if(len == 0) {
+               received_zero = true;
+       }
+
+       received += len;
+}
+
+static bool accept_cb(meshlink_handle_t *mesh, meshlink_channel_t *channel, uint16_t port, const void *data, size_t len) {
+       assert(port == 1);
+       assert(!data);
+       assert(!len);
+       assert(meshlink_channel_get_flags(mesh, channel) == (MESHLINK_CHANNEL_UDP | MESHLINK_CHANNEL_FRAMED));
+       meshlink_set_channel_receive_cb(mesh, channel, receive_cb);
+       set_sync_flag(&accept_flag, true);
+
+       return true;
+}
+
+int main(void) {
+       meshlink_set_log_cb(NULL, MESHLINK_WARNING, log_cb);
+
+       // Open two meshlink instances
+
+       meshlink_handle_t *mesh_a, *mesh_b;
+       open_meshlink_pair(&mesh_a, &mesh_b, "channels_udp_framed");
+       start_meshlink_pair(mesh_a, mesh_b);
+
+       // Create a channel from a to b
+
+       meshlink_set_channel_accept_cb(mesh_b, accept_cb);
+
+       meshlink_node_t *b = meshlink_get_node(mesh_a, "b");
+       assert(b);
+
+       meshlink_channel_t *channel = meshlink_channel_open_ex(mesh_a, b, 1, NULL, NULL, 0, MESHLINK_CHANNEL_UDP | MESHLINK_CHANNEL_FRAMED);
+       assert(channel);
+
+       // Check that we cannot send more than 65535 bytes without errors
+
+       char data[65535] = "";
+       assert(meshlink_channel_send(mesh_a, channel, data, 65536) == -1);
+
+       // Check that we can send 65535 bytes
+
+       uint16_t framelen = 65535;
+       memcpy(data, &framelen, sizeof(framelen));
+       assert(meshlink_channel_send(mesh_a, channel, data, framelen) == framelen);
+
+       // Check that we can send zero bytes
+
+       assert(meshlink_channel_send(mesh_a, channel, data, 0) == 0);
+
+       // Stream packets from a to b for 5 seconds at 40 Mbps (~1 kB * 500 Hz)
+
+       size_t total_len = framelen;
+
+       for(int j = 0; j < 2500; j++) {
+               framelen = rand() % 2048;
+               memcpy(data, &framelen, sizeof(framelen));
+               assert(meshlink_channel_send(mesh_a, channel, data, framelen) == framelen);
+
+               total_len += framelen;
+
+               long msec = j % 100 ? 2 : 100;
+               const struct timespec req = {0, msec * 1000000};
+               clock_nanosleep(CLOCK_MONOTONIC, 0, &req, NULL);
+       }
+
+       // Closes the channel and wait for the other end to closes it as well
+
+       meshlink_channel_close(mesh_a, channel);
+       assert(wait_sync_flag(&close_flag, 10));
+
+       // Check that the clients have received (most of) the data
+
+       assert(received <= total_len);
+       assert(received >= total_len / 2);
+       assert(received_zero);
+
+       close_meshlink_pair(mesh_a, mesh_b);
+
+       return 0;
+}
index f4ddbe2b0f1efb3988022233b89195b028183915..91da36e5071ff5f6febdb3ea774e3fe249e49232 100644 (file)
 #include "../src/meshlink.h"
 #include "utils.h"
 
-static struct sync_flag accept_flag;
-static struct sync_flag close_flag;
 
 struct client {
        meshlink_handle_t *mesh;
-       meshlink_channel_t *channel;
        size_t received;
        bool got_large_packet;
+       struct sync_flag accept_flag;
+       struct sync_flag close_flag;
 };
 
-static void server_receive_cb(meshlink_handle_t *mesh, meshlink_channel_t *channel, const void *data, size_t len) {
-       (void)data;
-
-       // We expect no data from clients, apart from disconnections.
-       assert(len == 0);
-
-       meshlink_channel_t **c = mesh->priv;
-       int count = 0;
-
-       for(int i = 0; i < 3; i++) {
-               if(c[i] == channel) {
-                       c[i] = NULL;
-                       meshlink_channel_close(mesh, channel);
-               }
-
-               if(c[i]) {
-                       count++;
-               }
-       }
-
-       if(!count) {
-               set_sync_flag(&close_flag, true);
-       }
-}
-
-static void client_receive_cb(meshlink_handle_t *mesh, meshlink_channel_t *channel, const void *data, size_t len) {
+static void receive_cb(meshlink_handle_t *mesh, meshlink_channel_t *channel, const void *data, size_t len) {
        (void)channel;
        (void)data;
 
        // We expect always the same amount of data from the server.
        assert(mesh->priv);
        struct client *client = mesh->priv;
+
+       if(!data && len == 0) {
+               set_sync_flag(&client->close_flag, true);
+               meshlink_channel_close(mesh, channel);
+               return;
+       }
+
        assert(len == 512 || len == 65535);
        client->received += len;
 
@@ -63,41 +44,19 @@ static void client_receive_cb(meshlink_handle_t *mesh, meshlink_channel_t *chann
        }
 }
 
-static void status_cb(meshlink_handle_t *mesh, meshlink_node_t *node, bool reachable) {
-       assert(mesh->priv);
-       struct client *client = mesh->priv;
-
-       if(reachable && !strcmp(node->name, "server")) {
-               assert(!client->channel);
-               client->channel = meshlink_channel_open_ex(mesh, node, 1, client_receive_cb, NULL, 0, MESHLINK_CHANNEL_UDP);
-               assert(client->channel);
-       }
-}
-
 static bool accept_cb(meshlink_handle_t *mesh, meshlink_channel_t *channel, uint16_t port, const void *data, size_t len) {
-       (void)data;
-       (void)len;
-
        assert(port == 1);
+       assert(!data);
+       assert(!len);
        assert(meshlink_channel_get_flags(mesh, channel) == MESHLINK_CHANNEL_UDP);
-       meshlink_set_channel_receive_cb(mesh, channel, server_receive_cb);
 
        assert(mesh->priv);
-       meshlink_channel_t **c = mesh->priv;
-
-       for(int i = 0; i < 3; i++) {
-               if(c[i] == NULL) {
-                       c[i] = channel;
+       struct client *client = mesh->priv;
 
-                       if(i == 2) {
-                               set_sync_flag(&accept_flag, true);
-                       }
+       meshlink_set_channel_receive_cb(mesh, channel, receive_cb);
+       set_sync_flag(&client->accept_flag, true);
 
-                       return true;
-               }
-       }
-
-       return false;
+       return true;
 }
 
 int main(void) {
@@ -115,7 +74,6 @@ int main(void) {
        assert(server);
        meshlink_enable_discovery(server, false);
        server->priv = channels;
-       meshlink_set_channel_accept_cb(server, accept_cb);
        assert(meshlink_start(server));
 
        for(int i = 0; i < 3; i++) {
@@ -126,18 +84,24 @@ int main(void) {
                assert(clients[i].mesh);
                clients[i].mesh->priv = &clients[i];
                meshlink_enable_discovery(clients[i].mesh, false);
+               meshlink_set_channel_accept_cb(clients[i].mesh, accept_cb);
                link_meshlink_pair(server, clients[i].mesh);
-               meshlink_set_node_status_cb(clients[i].mesh, status_cb);
                assert(meshlink_start(clients[i].mesh));
        }
 
-       // Wait for all three channels to connect
-
-       assert(wait_sync_flag(&accept_flag, 10));
+       // Open channels
 
        for(int i = 0; i < 3; i++) {
+               meshlink_node_t *peer = meshlink_get_node(server, names[i]);
+               assert(peer);
+               channels[i] = meshlink_channel_open_ex(server, peer, 1, NULL, NULL, 0, MESHLINK_CHANNEL_UDP);
                assert(channels[i]);
-               assert(clients[i].channel);
+       }
+
+       // Wait for all three channels to connect
+
+       for(int i = 0; i < 3; i++) {
+               assert(wait_sync_flag(&clients[i].accept_flag, 10));
        }
 
        // Check that we can send up to 65535 bytes without errors
@@ -148,6 +112,12 @@ int main(void) {
                assert(meshlink_channel_send(server, channels[i], large_data, sizeof(large_data)) == sizeof(large_data));
        }
 
+       // Check that we can send zero bytes without errors
+
+       for(int i = 0; i < 3; i++) {
+               assert(meshlink_channel_send(server, channels[i], large_data, 0) == 0);
+       }
+
        // Assert that any larger packets are not allowed
 
        assert(meshlink_channel_send(server, channels[0], large_data, 65536) == -1);
@@ -159,21 +129,23 @@ int main(void) {
 
        for(int j = 0; j < 2500; j++) {
                for(int i = 0; i < 3; i++) {
-                       assert(meshlink_channel_send(server, channels[i], data, sizeof(data)) == sizeof(data));
+                       size_t result = meshlink_channel_send(server, channels[i], data, sizeof(data));
+                       assert(result == sizeof(data));
                }
 
                const struct timespec req = {0, 2000000};
                clock_nanosleep(CLOCK_MONOTONIC, 0, &req, NULL);
        }
 
-       // Let the clients close the channels
+       // Close the channels
 
        for(int i = 0; i < 3; i++) {
-               meshlink_channel_close(clients[i].mesh, clients[i].channel);
-               meshlink_set_node_status_cb(clients[i].mesh, NULL);
+               meshlink_channel_close(server, channels[i]);
        }
 
-       assert(wait_sync_flag(&close_flag, 10));
+       for(int i = 0; i < 3; i++) {
+               assert(wait_sync_flag(&clients[i].close_flag, 10));
+       }
 
        // Check that the clients have received (most of) the data
 
@@ -181,12 +153,16 @@ int main(void) {
                fprintf(stderr, "%s received %zu\n", clients[i].mesh->name, clients[i].received);
        }
 
+       bool got_large_packet = false;
+
        for(int i = 0; i < 3; i++) {
                assert(clients[i].received >= 1000000);
                assert(clients[i].received <= 1345536);
-               assert(clients[i].got_large_packet);
+               got_large_packet |= clients[i].got_large_packet;
        }
 
+       assert(got_large_packet);
+
        // Clean up.
 
        for(int i = 0; i < 3; i++) {