From d90d88adfbfc340e8eb473e0265f2741bf3f40a4 Mon Sep 17 00:00:00 2001 From: Guus Sliepen Date: Mon, 25 May 2020 00:11:10 +0200 Subject: [PATCH] Implement MESHLINK_CHANNEL_FRAMED. Both UDP and TCP style channels can now be set to use message framing. Allowed message sizes are 0 to 65535 bytes. For TCP, this means every meshlink_channel_send() will cause exactly one receive callback on the other end, with the same size as the sent data. For UDP style channels, this was already the normal behaviour (absent packet loss), but with framing enabled UTCP will now concatenate multiple messages in a single packet if possible. --- src/meshlink++.h | 13 ++ src/meshlink.c | 27 ++- src/meshlink.h | 13 ++ src/meshlink.sym | 1 + src/utcp.c | 455 +++++++++++++++++++++++++++++++------ src/utcp.h | 3 + src/utcp_priv.h | 2 + test/Makefile.am | 10 + test/channels-framed.c | 126 ++++++++++ test/channels-udp-framed.c | 122 ++++++++++ test/channels-udp.c | 114 ++++------ 11 files changed, 740 insertions(+), 146 deletions(-) create mode 100644 test/channels-framed.c create mode 100644 test/channels-udp-framed.c diff --git a/src/meshlink++.h b/src/meshlink++.h index bca39fd3..b27d9a84 100644 --- a/src/meshlink++.h +++ b/src/meshlink++.h @@ -823,6 +823,19 @@ public: meshlink_set_node_channel_timeout(handle, node, timeout); } + /// Set the flush timeout used for channels using message framing to the given node. + /** This sets the timeout after which UDP-style channels that use message framing will + flush partial packets. + * The timeout is set for all current and future channels to the given node. + * + * @param node The node to set the flush timeout for. + * @param timeout The timeout in milliseconds after which partial packets will be flushed. + * The default is 5 milliseconds. + */ + void set_node_flush_timeout(node *node, int timeout) { + meshlink_set_node_flush_timeout(handle, node, timeout); + } + /// Open a reliable stream channel to another node. /** This function is called whenever a remote node wants to open a channel to the local node. * The application then has to decide whether to accept or reject this channel. diff --git a/src/meshlink.c b/src/meshlink.c index 22f5220d..89680ef7 100644 --- a/src/meshlink.c +++ b/src/meshlink.c @@ -3906,11 +3906,7 @@ ssize_t meshlink_channel_send(meshlink_handle_t *mesh, meshlink_channel_t *chann return -1; } - if(!len) { - return 0; - } - - if(!data) { + if(len && !data) { meshlink_errno = MESHLINK_EINVAL; return -1; } @@ -4145,6 +4141,27 @@ void meshlink_set_node_channel_timeout(meshlink_handle_t *mesh, meshlink_node_t pthread_mutex_unlock(&mesh->mutex); } +void meshlink_set_node_flush_timeout(meshlink_handle_t *mesh, meshlink_node_t *node, int timeout) { + if(!mesh || !node) { + meshlink_errno = MESHLINK_EINVAL; + return; + } + + node_t *n = (node_t *)node; + + pthread_mutex_lock(&mesh->mutex); + + if(!n->utcp) { + n->utcp = utcp_init(channel_accept, channel_pre_accept, channel_send, n); + utcp_set_mtu(n->utcp, n->mtu - sizeof(meshlink_packethdr_t)); + utcp_set_retransmit_cb(n->utcp, channel_retransmit); + } + + utcp_set_flush_timeout(n->utcp, timeout); + + pthread_mutex_unlock(&mesh->mutex); +} + void update_node_status(meshlink_handle_t *mesh, node_t *n) { if(n->status.reachable && mesh->channel_accept_cb && !n->utcp) { n->utcp = utcp_init(channel_accept, channel_pre_accept, channel_send, n); diff --git a/src/meshlink.h b/src/meshlink.h index f5044975..cece7f8d 100644 --- a/src/meshlink.h +++ b/src/meshlink.h @@ -1530,6 +1530,19 @@ size_t meshlink_channel_get_mss(struct meshlink_handle *mesh, struct meshlink_ch */ void meshlink_set_node_channel_timeout(struct meshlink_handle *mesh, struct meshlink_node *node, int timeout); +/// Set the flush timeout used for channels using message framing to the given node. +/** This sets the timeout after which UDP-style channels that use message framing will + flush partial packets. + * The timeout is set for all current and future channels to the given node. + * + * \memberof meshlink_node + * @param mesh A handle which represents an instance of MeshLink. + * @param node A pointer to a struct meshlink_node describing the node to set the channel connection timeout for. + * @param timeout The timeout in milliseconds after which partial packets will be flushed. + * The default is 5 milliseconds. + */ +void meshlink_set_node_flush_timeout(struct meshlink_handle *mesh, struct meshlink_node *node, int timeout); + /// Hint that a hostname may be found at an address /** This function indicates to meshlink that the given hostname is likely found * at the given IP address and port. diff --git a/src/meshlink.sym b/src/meshlink.sym index 16a659ee..24de6b42 100644 --- a/src/meshlink.sym +++ b/src/meshlink.sym @@ -79,6 +79,7 @@ meshlink_set_inviter_commits_first meshlink_set_log_cb meshlink_set_node_channel_timeout meshlink_set_node_duplicate_cb +meshlink_set_node_flush_timeout meshlink_set_node_pmtu_cb meshlink_set_node_status_cb meshlink_set_port diff --git a/src/utcp.c b/src/utcp.c index c4305cd4..2a1ac5b8 100644 --- a/src/utcp.c +++ b/src/utcp.c @@ -195,6 +195,10 @@ static bool is_reliable(struct utcp_connection *c) { return c->flags & UTCP_RELIABLE; } +static bool is_framed(struct utcp_connection *c) { + return c->flags & UTCP_FRAMED; +} + static int32_t seqdiff(uint32_t a, uint32_t b) { return a - b; } @@ -599,6 +603,19 @@ static void start_retransmit_timer(struct utcp_connection *c) { debug(c, "rtrx_timeout %ld.%06lu\n", c->rtrx_timeout.tv_sec, c->rtrx_timeout.tv_nsec); } +static void start_flush_timer(struct utcp_connection *c) { + clock_gettime(UTCP_CLOCK, &c->rtrx_timeout); + + c->rtrx_timeout.tv_nsec += c->utcp->flush_timeout * 1000000; + + if(c->rtrx_timeout.tv_nsec >= NSEC_PER_SEC) { + c->rtrx_timeout.tv_nsec -= NSEC_PER_SEC; + c->rtrx_timeout.tv_sec++; + } + + debug(c, "rtrx_timeout %ld.%06lu (flush)\n", c->rtrx_timeout.tv_sec, c->rtrx_timeout.tv_nsec); +} + static void stop_retransmit_timer(struct utcp_connection *c) { timespec_clear(&c->rtrx_timeout); debug(c, "rtrx_timeout cleared\n"); @@ -735,108 +752,263 @@ static void ack(struct utcp_connection *c, bool sendatleastone) { } while(left); } -ssize_t utcp_send(struct utcp_connection *c, const void *data, size_t len) { - if(c->reapable) { - debug(c, "send() called on closed connection\n"); - errno = EBADF; - return -1; +static ssize_t utcp_send_reliable(struct utcp_connection *c, const void *data, size_t len) { + size_t rlen = len + (is_framed(c) ? 2 : 0); + + if(!rlen) { + return 0; } - switch(c->state) { - case CLOSED: - case LISTEN: - debug(c, "send() called on unconnected connection\n"); - errno = ENOTCONN; - return -1; + // Check if we need to be able to buffer all data - case SYN_SENT: - case SYN_RECEIVED: - case ESTABLISHED: - case CLOSE_WAIT: - break; + if(c->flags & (UTCP_NO_PARTIAL | UTCP_FRAMED)) { + if(rlen > c->sndbuf.maxsize) { + errno = EMSGSIZE; + return -1; + } - case FIN_WAIT_1: - case FIN_WAIT_2: - case CLOSING: - case LAST_ACK: - case TIME_WAIT: - debug(c, "send() called on closed connection\n"); - errno = EPIPE; - return -1; + if((c->flags & UTCP_FRAMED) && len > MAX_UNRELIABLE_SIZE) { + errno = EMSGSIZE; + return -1; + } + + if(rlen > buffer_free(&c->sndbuf)) { + errno = EWOULDBLOCK; + return 0; + } } - // Exit early if we have nothing to send. + // Add data to the send buffer. - if(!len) { - return 0; + if(is_framed(c)) { + uint16_t len16 = len; + buffer_put(&c->sndbuf, &len16, sizeof(len16)); + assert(buffer_put(&c->sndbuf, data, len) == (ssize_t)len); + } else { + len = buffer_put(&c->sndbuf, data, len); + + if(len <= 0) { + errno = EWOULDBLOCK; + return 0; + } } - if(!data) { - errno = EFAULT; - return -1; + c->snd.last += rlen; + + // Don't send anything yet if the connection has not fully established yet + + if(c->state == SYN_SENT || c->state == SYN_RECEIVED) { + return len; } - // Check if we need to be able to buffer all data + ack(c, false); - if(c->flags & UTCP_NO_PARTIAL) { - if(len > buffer_free(&c->sndbuf)) { - if(len > c->sndbuf.maxsize) { - errno = EMSGSIZE; - return -1; - } else { - errno = EWOULDBLOCK; - return 0; - } - } + if(!timespec_isset(&c->rtrx_timeout)) { + start_retransmit_timer(c); } - // Add data to send buffer. + if(!timespec_isset(&c->conn_timeout)) { + clock_gettime(UTCP_CLOCK, &c->conn_timeout); + c->conn_timeout.tv_sec += c->utcp->timeout; + } - if(is_reliable(c)) { - len = buffer_put(&c->sndbuf, data, len); - } else if(c->state != SYN_SENT && c->state != SYN_RECEIVED) { - if(len > MAX_UNRELIABLE_SIZE || buffer_put(&c->sndbuf, data, len) != (ssize_t)len) { - errno = EMSGSIZE; - return -1; + return len; +} + + +/* In the send buffer we can have multiple frames, each prefixed with their + length. However, the first frame might already have been partially sent. The + variable c->frame_offset tracks how much of a partial frame is left at the + start. If it is 0, it means there is no partial frame, and the first two + bytes in the send buffer are the length of the first frame. + + After sending an MSS sized packet, we need to calculate the new frame_offset + value, since it is likely that the next packet will also have a partial frame + at the start. We do this by scanning the previously sent packet for frame + headers, to find out how many bytes of the last frame are left to send. +*/ +static void ack_unreliable_framed(struct utcp_connection *c) { + int32_t left = seqdiff(c->snd.last, c->snd.nxt); + assert(left > 0); + + struct { + struct hdr hdr; + uint8_t data[]; + } *pkt = c->utcp->pkt; + + pkt->hdr.src = c->src; + pkt->hdr.dst = c->dst; + pkt->hdr.ack = c->rcv.nxt; + pkt->hdr.ctl = ACK | MF; + pkt->hdr.aux = 0; + + bool sent_packet = false; + + while(left >= c->utcp->mss) { + pkt->hdr.wnd = c->frame_offset; + uint32_t seglen = c->utcp->mss; + + pkt->hdr.seq = c->snd.nxt; + + buffer_copy(&c->sndbuf, pkt->data, seqdiff(c->snd.nxt, c->snd.una), seglen); + + c->snd.nxt += seglen; + c->snd.una = c->snd.nxt; + left -= seglen; + + print_packet(c, "send", pkt, sizeof(pkt->hdr) + seglen); + c->utcp->send(c->utcp, pkt, sizeof(pkt->hdr) + seglen); + sent_packet = true; + + // Calculate the new frame offset + while(c->frame_offset < seglen) { + uint16_t framelen; + buffer_copy(&c->sndbuf, &framelen, c->frame_offset, sizeof(framelen)); + c->frame_offset += framelen + 2; } - } else { - return 0; + + buffer_discard(&c->sndbuf, seglen); + c->frame_offset -= seglen; + }; + + if(sent_packet) { + if(left) { + // We sent one packet but we have partial data left, (re)start the flush timer + start_flush_timer(c); + } else { + // There is no partial data in the send buffer, so stop the flush timer + stop_retransmit_timer(c); + } + } +} + +static void flush_unreliable_framed(struct utcp_connection *c) { + int32_t left = seqdiff(c->snd.last, c->snd.nxt); + + /* If the MSS dropped since last time ack_unreliable_frame() was called, + we might now have more than one segment worth of data left. + */ + if(left > c->utcp->mss) { + ack_unreliable_framed(c); + left = seqdiff(c->snd.last, c->snd.nxt); + assert(left <= c->utcp->mss); } - if(len <= 0) { - if(is_reliable(c)) { + if(left) { + struct { + struct hdr hdr; + uint8_t data[]; + } *pkt = c->utcp->pkt; + + pkt->hdr.src = c->src; + pkt->hdr.dst = c->dst; + pkt->hdr.seq = c->snd.nxt; + pkt->hdr.ack = c->rcv.nxt; + pkt->hdr.wnd = c->frame_offset; + pkt->hdr.ctl = ACK | MF; + pkt->hdr.aux = 0; + + uint32_t seglen = left; + + buffer_copy(&c->sndbuf, pkt->data, seqdiff(c->snd.nxt, c->snd.una), seglen); + buffer_discard(&c->sndbuf, seglen); + + c->snd.nxt += seglen; + c->snd.una = c->snd.nxt; + + print_packet(c, "send", pkt, sizeof(pkt->hdr) + seglen); + c->utcp->send(c->utcp, pkt, sizeof(pkt->hdr) + seglen); + } + + c->frame_offset = 0; + stop_retransmit_timer(c); +} + + +static ssize_t utcp_send_unreliable(struct utcp_connection *c, const void *data, size_t len) { + if(len > MAX_UNRELIABLE_SIZE) { + errno = EMSGSIZE; + return -1; + } + + size_t rlen = len + (is_framed(c) ? 2 : 0); + + if(rlen > buffer_free(&c->sndbuf)) { + if(rlen > c->sndbuf.maxsize) { + errno = EMSGSIZE; + return -1; + } else { errno = EWOULDBLOCK; return 0; - } else { - return len; } } - c->snd.last += len; - // Don't send anything yet if the connection has not fully established yet if(c->state == SYN_SENT || c->state == SYN_RECEIVED) { return len; } - ack(c, false); + if(is_framed(c)) { + uint16_t framelen = len; + buffer_put(&c->sndbuf, &framelen, sizeof(framelen)); + } - if(!is_reliable(c)) { + buffer_put(&c->sndbuf, data, len); + + c->snd.last += rlen; + + if(is_framed(c)) { + ack_unreliable_framed(c); + } else { + ack(c, false); c->snd.una = c->snd.nxt = c->snd.last; buffer_discard(&c->sndbuf, c->sndbuf.used); } - if(is_reliable(c) && !timespec_isset(&c->rtrx_timeout)) { - start_retransmit_timer(c); + return len; +} + +ssize_t utcp_send(struct utcp_connection *c, const void *data, size_t len) { + if(c->reapable) { + debug(c, "send() called on closed connection\n"); + errno = EBADF; + return -1; } - if(is_reliable(c) && !timespec_isset(&c->conn_timeout)) { - clock_gettime(UTCP_CLOCK, &c->conn_timeout); - c->conn_timeout.tv_sec += c->utcp->timeout; + switch(c->state) { + case CLOSED: + case LISTEN: + debug(c, "send() called on unconnected connection\n"); + errno = ENOTCONN; + return -1; + + case SYN_SENT: + case SYN_RECEIVED: + case ESTABLISHED: + case CLOSE_WAIT: + break; + + case FIN_WAIT_1: + case FIN_WAIT_2: + case CLOSING: + case LAST_ACK: + case TIME_WAIT: + debug(c, "send() called on closed connection\n"); + errno = EPIPE; + return -1; } - return len; + if(!data && len) { + errno = EFAULT; + return -1; + } + + if(is_reliable(c)) { + return utcp_send_reliable(c, data, len); + } else { + return utcp_send_unreliable(c, data, len); + } } static void swap_ports(struct hdr *hdr) { @@ -899,7 +1071,7 @@ static void retransmit(struct utcp_connection *c) { struct utcp *utcp = c->utcp; - if(utcp->retransmit) { + if(utcp->retransmit && is_reliable(c)) { utcp->retransmit(c); } @@ -942,6 +1114,11 @@ static void retransmit(struct utcp_connection *c) { case CLOSE_WAIT: case CLOSING: case LAST_ACK: + if(!is_reliable(c) && is_framed(c) && c->sndbuf.used) { + flush_unreliable_framed(c); + return; + } + // Send unacked data again. pkt->hdr.seq = c->snd.una; pkt->hdr.ack = c->rcv.nxt; @@ -1100,6 +1277,13 @@ static void handle_out_of_order(struct utcp_connection *c, uint32_t offset, cons } } +static void handle_out_of_order_framed(struct utcp_connection *c, uint32_t offset, const void *data, size_t len) { + uint32_t in_order_offset = (c->sacks[0].len && c->sacks[0].offset == 0) ? c->sacks[0].len : 0; + + // Put the data into the receive buffer + handle_out_of_order(c, offset + in_order_offset, data, len); +} + static void handle_in_order(struct utcp_connection *c, const void *data, size_t len) { if(c->recv) { ssize_t rxd = c->recv(c, data, len); @@ -1135,6 +1319,50 @@ static void handle_in_order(struct utcp_connection *c, const void *data, size_t c->rcv.nxt += len; } +static void handle_in_order_framed(struct utcp_connection *c, const void *data, size_t len) { + // Treat it as out of order, since it is unlikely the start of this packet contains the start of a frame. + uint32_t in_order_offset = (c->sacks[0].len && c->sacks[0].offset == 0) ? c->sacks[0].len : 0; + handle_out_of_order(c, in_order_offset, data, len); + + // While we have full frames at the start, give them to the application + while(c->sacks[0].len >= 2 && c->sacks[0].offset == 0) { + uint16_t framelen; + buffer_copy(&c->rcvbuf, &framelen, 0, sizeof(&framelen)); + + if(framelen > c->sacks[0].len - 2) { + break; + } + + if(c->recv) { + ssize_t rxd; + uint32_t realoffset = c->rcvbuf.offset + 2; + + if(c->rcvbuf.size - c->rcvbuf.offset <= 2) { + realoffset -= c->rcvbuf.size; + } + + if(realoffset > c->rcvbuf.size - framelen) { + // The buffer wraps, we need to copy + uint8_t buf[framelen]; + buffer_copy(&c->rcvbuf, buf, 2, framelen); + rxd = c->recv(c, buf, framelen); + } else { + // The frame is contiguous in the receive buffer + rxd = c->recv(c, c->rcvbuf.data + realoffset, framelen); + } + + if(rxd != (ssize_t)framelen) { + // TODO: handle the application not accepting all data. + abort(); + } + } + + sack_consume(c, framelen + 2); + } + + c->rcv.nxt += len; +} + static void handle_unreliable(struct utcp_connection *c, const struct hdr *hdr, const void *data, size_t len) { // Fast path for unfragmented packets if(!hdr->wnd && !(hdr->ctl & MF)) { @@ -1175,18 +1403,87 @@ static void handle_unreliable(struct utcp_connection *c, const struct hdr *hdr, c->rcv.nxt = hdr->seq + len; } +static void handle_unreliable_framed(struct utcp_connection *c, const struct hdr *hdr, const void *data, size_t len) { + bool in_order = hdr->seq == c->rcv.nxt; + c->rcv.nxt = hdr->seq + len; + + const uint8_t *ptr = data; + size_t left = len; + + // Does it start with a partial frame? + if(hdr->wnd) { + // Only accept the data if it is in order + if(in_order && c->rcvbuf.used) { + // In order, append it to the receive buffer + buffer_put(&c->rcvbuf, data, min(hdr->wnd, len)); + + if(hdr->wnd <= len) { + // We have a full frame + c->recv(c, (uint8_t *)c->rcvbuf.data + 2, c->rcvbuf.used - 2); + } + } + + // Exit early if there is other data in this frame + if(hdr->wnd > len) { + if(!in_order) { + buffer_clear(&c->rcvbuf); + } + + return; + } + + ptr += hdr->wnd; + left -= hdr->wnd; + } + + // We now start with new frames, so clear any data in the receive buffer + buffer_clear(&c->rcvbuf); + + // Handle whole frames + while(left > 2) { + uint16_t framelen; + memcpy(&framelen, ptr, sizeof(framelen)); + + if(left <= (size_t)framelen + 2) { + break; + } + + c->recv(c, ptr + 2, framelen); + ptr += framelen + 2; + left -= framelen + 2; + } + + // Handle partial last frame + if(left) { + buffer_put(&c->rcvbuf, ptr, left); + } +} + static void handle_incoming_data(struct utcp_connection *c, const struct hdr *hdr, const void *data, size_t len) { if(!is_reliable(c)) { - handle_unreliable(c, hdr, data, len); + if(is_framed(c)) { + handle_unreliable_framed(c, hdr, data, len); + } else { + handle_unreliable(c, hdr, data, len); + } + return; } uint32_t offset = seqdiff(hdr->seq, c->rcv.nxt); - if(offset) { - handle_out_of_order(c, offset, data, len); + if(is_framed(c)) { + if(offset) { + handle_out_of_order_framed(c, offset, data, len); + } else { + handle_in_order_framed(c, data, len); + } } else { - handle_in_order(c, data, len); + if(offset) { + handle_out_of_order(c, offset, data, len); + } else { + handle_in_order(c, data, len); + } } } @@ -1447,7 +1744,7 @@ synack: int32_t rcv_offset = seqdiff(hdr.seq, c->rcv.nxt); if(rcv_offset) { - debug(c, "packet out of order, offset %u bytes", rcv_offset); + debug(c, "packet out of order, offset %u bytes\n", rcv_offset); } #endif @@ -1938,6 +2235,10 @@ int utcp_shutdown(struct utcp_connection *c, int dir) { case SYN_RECEIVED: case ESTABLISHED: + if(!is_reliable(c) && is_framed(c)) { + flush_unreliable_framed(c); + } + set_state(c, FIN_WAIT_1); break; @@ -1957,7 +2258,7 @@ int utcp_shutdown(struct utcp_connection *c, int dir) { c->snd.last++; - ack(c, false); + ack(c, !is_reliable(c)); if(!timespec_isset(&c->rtrx_timeout)) { start_retransmit_timer(c); @@ -2052,8 +2353,10 @@ void utcp_abort_all_connections(struct utcp *utcp) { } int utcp_close(struct utcp_connection *c) { + debug(c, "closing\n"); + if(c->rcvbuf.used) { - fprintf(stderr, "UTCP channel closed with stuff in receive buffer\n"); + debug(c, "receive buffer not empty, resetting\n"); return reset_connection(c) ? 0 : -1; } @@ -2474,3 +2777,11 @@ void utcp_set_retransmit_cb(struct utcp *utcp, utcp_retransmit_t cb) { void utcp_set_clock_granularity(long granularity) { CLOCK_GRANULARITY = granularity; } + +int utcp_get_flush_timeout(struct utcp *utcp) { + return utcp->flush_timeout; +} + +void utcp_set_flush_timeout(struct utcp *utcp, int milliseconds) { + utcp->flush_timeout = milliseconds; +} diff --git a/src/utcp.h b/src/utcp.h index c051ff1f..60ecdd34 100644 --- a/src/utcp.h +++ b/src/utcp.h @@ -86,6 +86,9 @@ void utcp_abort_all_connections(struct utcp *utcp); int utcp_get_user_timeout(struct utcp *utcp); void utcp_set_user_timeout(struct utcp *utcp, int seconds); +int utcp_get_flush_timeout(struct utcp *utcp); +void utcp_set_flush_timeout(struct utcp *utcp, int milliseconds); + uint16_t utcp_get_mtu(struct utcp *utcp); uint16_t utcp_get_mss(struct utcp *utcp); void utcp_set_mtu(struct utcp *utcp, uint16_t mtu); diff --git a/src/utcp_priv.h b/src/utcp_priv.h index 7ef14778..8713d2d4 100644 --- a/src/utcp_priv.h +++ b/src/utcp_priv.h @@ -137,6 +137,7 @@ struct utcp_connection { uint32_t irs; } rcv; + uint32_t frame_offset; int dupack; // Timers @@ -190,6 +191,7 @@ struct utcp { uint16_t mtu; // The maximum size of a UTCP packet, including headers. uint16_t mss; // The maximum size of the payload of a UTCP packet. int timeout; // sec + int flush_timeout; // milliseconds // Connection management diff --git a/test/Makefile.am b/test/Makefile.am index 28f92560..9df5ca2f 100644 --- a/test/Makefile.am +++ b/test/Makefile.am @@ -9,8 +9,10 @@ TESTS = \ channels-cornercases \ channels-failure \ channels-fork \ + channels-framed \ channels-no-partial \ channels-udp \ + channels-udp-framed \ duplicate \ encrypted \ ephemeral \ @@ -43,8 +45,10 @@ check_PROGRAMS = \ channels-cornercases \ channels-failure \ channels-fork \ + channels-framed \ channels-no-partial \ channels-udp \ + channels-udp-framed \ duplicate \ echo-fork \ encrypted \ @@ -73,6 +77,9 @@ blacklist_LDADD = $(top_builddir)/src/libmeshlink.la channels_SOURCES = channels.c utils.c utils.h channels_LDADD = $(top_builddir)/src/libmeshlink.la +channels_framed_SOURCES = channels-framed.c utils.c utils.h +channels_framed_LDADD = $(top_builddir)/src/libmeshlink.la + channels_aio_SOURCES = channels-aio.c utils.c utils.h channels_aio_LDADD = $(top_builddir)/src/libmeshlink.la @@ -97,6 +104,9 @@ channels_cornercases_LDADD = $(top_builddir)/src/libmeshlink.la channels_udp_SOURCES = channels-udp.c utils.c utils.h channels_udp_LDADD = $(top_builddir)/src/libmeshlink.la +channels_udp_framed_SOURCES = channels-udp-framed.c utils.c utils.h +channels_udp_framed_LDADD = $(top_builddir)/src/libmeshlink.la + duplicate_SOURCES = duplicate.c utils.c utils.h duplicate_LDADD = $(top_builddir)/src/libmeshlink.la diff --git a/test/channels-framed.c b/test/channels-framed.c new file mode 100644 index 00000000..b6d9928b --- /dev/null +++ b/test/channels-framed.c @@ -0,0 +1,126 @@ +#ifdef NDEBUG +#undef NDEBUG +#endif + +#include +#include +#include +#include +#include +#include +#include + +#include "../src/meshlink.h" +#include "utils.h" + +static bool received_large; +static bool received_zero; +static size_t received; +static struct sync_flag accept_flag; +static struct sync_flag close_flag; + +static void receive_cb(meshlink_handle_t *mesh, meshlink_channel_t *channel, const void *data, size_t len) { + (void)mesh; + + if(!data && !len) { + meshlink_channel_close(mesh, channel); + set_sync_flag(&close_flag, true); + } + + if(len >= 2) { + uint16_t checklen; + memcpy(&checklen, data, sizeof(checklen)); + assert(len == checklen); + } + + if(len == 65535) { + received_large = true; + } + + if(len == 0) { + received_zero = true; + } + + received += len; +} + +static bool accept_cb(meshlink_handle_t *mesh, meshlink_channel_t *channel, uint16_t port, const void *data, size_t len) { + assert(port == 1); + assert(!data); + assert(!len); + assert(meshlink_channel_get_flags(mesh, channel) == (MESHLINK_CHANNEL_TCP | MESHLINK_CHANNEL_FRAMED)); + meshlink_set_channel_receive_cb(mesh, channel, receive_cb); + set_sync_flag(&accept_flag, true); + + return true; +} + +int main(void) { + meshlink_set_log_cb(NULL, MESHLINK_WARNING, log_cb); + + // Open two meshlink instances + + meshlink_handle_t *mesh_a, *mesh_b; + open_meshlink_pair(&mesh_a, &mesh_b, "channels_framed"); + start_meshlink_pair(mesh_a, mesh_b); + + // Create a channel from a to b + + meshlink_set_channel_accept_cb(mesh_b, accept_cb); + + meshlink_node_t *b = meshlink_get_node(mesh_a, "b"); + assert(b); + + meshlink_channel_t *channel = meshlink_channel_open_ex(mesh_a, b, 1, NULL, NULL, 0, MESHLINK_CHANNEL_TCP | MESHLINK_CHANNEL_FRAMED); + assert(channel); + + size_t sndbuf_size = 128 * 1024; + meshlink_set_channel_sndbuf(mesh_a, channel, sndbuf_size); + + // Check that we cannot send more than 65535 bytes without errors + + char data[65535] = ""; + assert(meshlink_channel_send(mesh_a, channel, data, 65536) == -1); + + // Check that we can send 65535 bytes + + uint16_t framelen = 65535; + memcpy(data, &framelen, sizeof(framelen)); + assert(meshlink_channel_send(mesh_a, channel, data, framelen) == framelen); + + // Check that we can send zero bytes + + assert(meshlink_channel_send(mesh_a, channel, data, 0) == 0); + + // Send randomly sized frames from a to b + + size_t total_len = framelen; + + for(int j = 0; j < 2500; j++) { + framelen = rand() % 2048; + memcpy(data, &framelen, sizeof(framelen)); + + while(meshlink_channel_get_sendq(mesh_a, channel) > sndbuf_size - framelen - 2) { + const struct timespec req = {0, 2000000}; + clock_nanosleep(CLOCK_MONOTONIC, 0, &req, NULL); + } + + assert(meshlink_channel_send(mesh_a, channel, data, framelen) == framelen); + total_len += framelen; + } + + // Closes the channel and wait for the other end to closes it as well + + meshlink_channel_close(mesh_a, channel); + assert(wait_sync_flag(&close_flag, 10)); + + // Check that the clients have received all the data we sent + + assert(received == total_len); + assert(received_large); + assert(received_zero); + + close_meshlink_pair(mesh_a, mesh_b); + + return 0; +} diff --git a/test/channels-udp-framed.c b/test/channels-udp-framed.c new file mode 100644 index 00000000..9fb26971 --- /dev/null +++ b/test/channels-udp-framed.c @@ -0,0 +1,122 @@ +#ifdef NDEBUG +#undef NDEBUG +#endif + +#include +#include +#include +#include +#include +#include +#include + +#include "../src/meshlink.h" +#include "utils.h" + +static bool received_large; +static bool received_zero; +static size_t received; +static struct sync_flag accept_flag; +static struct sync_flag close_flag; + +static void receive_cb(meshlink_handle_t *mesh, meshlink_channel_t *channel, const void *data, size_t len) { + (void)mesh; + + if(!data && !len) { + meshlink_channel_close(mesh, channel); + set_sync_flag(&close_flag, true); + } + + if(len >= 2) { + uint16_t checklen; + memcpy(&checklen, data, sizeof(checklen)); + assert(len == checklen); + } + + if(len == 65535) { + received_large = true; + } + + if(len == 0) { + received_zero = true; + } + + received += len; +} + +static bool accept_cb(meshlink_handle_t *mesh, meshlink_channel_t *channel, uint16_t port, const void *data, size_t len) { + assert(port == 1); + assert(!data); + assert(!len); + assert(meshlink_channel_get_flags(mesh, channel) == (MESHLINK_CHANNEL_UDP | MESHLINK_CHANNEL_FRAMED)); + meshlink_set_channel_receive_cb(mesh, channel, receive_cb); + set_sync_flag(&accept_flag, true); + + return true; +} + +int main(void) { + meshlink_set_log_cb(NULL, MESHLINK_WARNING, log_cb); + + // Open two meshlink instances + + meshlink_handle_t *mesh_a, *mesh_b; + open_meshlink_pair(&mesh_a, &mesh_b, "channels_udp_framed"); + start_meshlink_pair(mesh_a, mesh_b); + + // Create a channel from a to b + + meshlink_set_channel_accept_cb(mesh_b, accept_cb); + + meshlink_node_t *b = meshlink_get_node(mesh_a, "b"); + assert(b); + + meshlink_channel_t *channel = meshlink_channel_open_ex(mesh_a, b, 1, NULL, NULL, 0, MESHLINK_CHANNEL_UDP | MESHLINK_CHANNEL_FRAMED); + assert(channel); + + // Check that we cannot send more than 65535 bytes without errors + + char data[65535] = ""; + assert(meshlink_channel_send(mesh_a, channel, data, 65536) == -1); + + // Check that we can send 65535 bytes + + uint16_t framelen = 65535; + memcpy(data, &framelen, sizeof(framelen)); + assert(meshlink_channel_send(mesh_a, channel, data, framelen) == framelen); + + // Check that we can send zero bytes + + assert(meshlink_channel_send(mesh_a, channel, data, 0) == 0); + + // Stream packets from a to b for 5 seconds at 40 Mbps (~1 kB * 500 Hz) + + size_t total_len = framelen; + + for(int j = 0; j < 2500; j++) { + framelen = rand() % 2048; + memcpy(data, &framelen, sizeof(framelen)); + assert(meshlink_channel_send(mesh_a, channel, data, framelen) == framelen); + + total_len += framelen; + + long msec = j % 100 ? 2 : 100; + const struct timespec req = {0, msec * 1000000}; + clock_nanosleep(CLOCK_MONOTONIC, 0, &req, NULL); + } + + // Closes the channel and wait for the other end to closes it as well + + meshlink_channel_close(mesh_a, channel); + assert(wait_sync_flag(&close_flag, 10)); + + // Check that the clients have received (most of) the data + + assert(received <= total_len); + assert(received >= total_len / 2); + assert(received_zero); + + close_meshlink_pair(mesh_a, mesh_b); + + return 0; +} diff --git a/test/channels-udp.c b/test/channels-udp.c index f4ddbe2b..91da36e5 100644 --- a/test/channels-udp.c +++ b/test/channels-udp.c @@ -13,48 +13,29 @@ #include "../src/meshlink.h" #include "utils.h" -static struct sync_flag accept_flag; -static struct sync_flag close_flag; struct client { meshlink_handle_t *mesh; - meshlink_channel_t *channel; size_t received; bool got_large_packet; + struct sync_flag accept_flag; + struct sync_flag close_flag; }; -static void server_receive_cb(meshlink_handle_t *mesh, meshlink_channel_t *channel, const void *data, size_t len) { - (void)data; - - // We expect no data from clients, apart from disconnections. - assert(len == 0); - - meshlink_channel_t **c = mesh->priv; - int count = 0; - - for(int i = 0; i < 3; i++) { - if(c[i] == channel) { - c[i] = NULL; - meshlink_channel_close(mesh, channel); - } - - if(c[i]) { - count++; - } - } - - if(!count) { - set_sync_flag(&close_flag, true); - } -} - -static void client_receive_cb(meshlink_handle_t *mesh, meshlink_channel_t *channel, const void *data, size_t len) { +static void receive_cb(meshlink_handle_t *mesh, meshlink_channel_t *channel, const void *data, size_t len) { (void)channel; (void)data; // We expect always the same amount of data from the server. assert(mesh->priv); struct client *client = mesh->priv; + + if(!data && len == 0) { + set_sync_flag(&client->close_flag, true); + meshlink_channel_close(mesh, channel); + return; + } + assert(len == 512 || len == 65535); client->received += len; @@ -63,41 +44,19 @@ static void client_receive_cb(meshlink_handle_t *mesh, meshlink_channel_t *chann } } -static void status_cb(meshlink_handle_t *mesh, meshlink_node_t *node, bool reachable) { - assert(mesh->priv); - struct client *client = mesh->priv; - - if(reachable && !strcmp(node->name, "server")) { - assert(!client->channel); - client->channel = meshlink_channel_open_ex(mesh, node, 1, client_receive_cb, NULL, 0, MESHLINK_CHANNEL_UDP); - assert(client->channel); - } -} - static bool accept_cb(meshlink_handle_t *mesh, meshlink_channel_t *channel, uint16_t port, const void *data, size_t len) { - (void)data; - (void)len; - assert(port == 1); + assert(!data); + assert(!len); assert(meshlink_channel_get_flags(mesh, channel) == MESHLINK_CHANNEL_UDP); - meshlink_set_channel_receive_cb(mesh, channel, server_receive_cb); assert(mesh->priv); - meshlink_channel_t **c = mesh->priv; - - for(int i = 0; i < 3; i++) { - if(c[i] == NULL) { - c[i] = channel; + struct client *client = mesh->priv; - if(i == 2) { - set_sync_flag(&accept_flag, true); - } + meshlink_set_channel_receive_cb(mesh, channel, receive_cb); + set_sync_flag(&client->accept_flag, true); - return true; - } - } - - return false; + return true; } int main(void) { @@ -115,7 +74,6 @@ int main(void) { assert(server); meshlink_enable_discovery(server, false); server->priv = channels; - meshlink_set_channel_accept_cb(server, accept_cb); assert(meshlink_start(server)); for(int i = 0; i < 3; i++) { @@ -126,18 +84,24 @@ int main(void) { assert(clients[i].mesh); clients[i].mesh->priv = &clients[i]; meshlink_enable_discovery(clients[i].mesh, false); + meshlink_set_channel_accept_cb(clients[i].mesh, accept_cb); link_meshlink_pair(server, clients[i].mesh); - meshlink_set_node_status_cb(clients[i].mesh, status_cb); assert(meshlink_start(clients[i].mesh)); } - // Wait for all three channels to connect - - assert(wait_sync_flag(&accept_flag, 10)); + // Open channels for(int i = 0; i < 3; i++) { + meshlink_node_t *peer = meshlink_get_node(server, names[i]); + assert(peer); + channels[i] = meshlink_channel_open_ex(server, peer, 1, NULL, NULL, 0, MESHLINK_CHANNEL_UDP); assert(channels[i]); - assert(clients[i].channel); + } + + // Wait for all three channels to connect + + for(int i = 0; i < 3; i++) { + assert(wait_sync_flag(&clients[i].accept_flag, 10)); } // Check that we can send up to 65535 bytes without errors @@ -148,6 +112,12 @@ int main(void) { assert(meshlink_channel_send(server, channels[i], large_data, sizeof(large_data)) == sizeof(large_data)); } + // Check that we can send zero bytes without errors + + for(int i = 0; i < 3; i++) { + assert(meshlink_channel_send(server, channels[i], large_data, 0) == 0); + } + // Assert that any larger packets are not allowed assert(meshlink_channel_send(server, channels[0], large_data, 65536) == -1); @@ -159,21 +129,23 @@ int main(void) { for(int j = 0; j < 2500; j++) { for(int i = 0; i < 3; i++) { - assert(meshlink_channel_send(server, channels[i], data, sizeof(data)) == sizeof(data)); + size_t result = meshlink_channel_send(server, channels[i], data, sizeof(data)); + assert(result == sizeof(data)); } const struct timespec req = {0, 2000000}; clock_nanosleep(CLOCK_MONOTONIC, 0, &req, NULL); } - // Let the clients close the channels + // Close the channels for(int i = 0; i < 3; i++) { - meshlink_channel_close(clients[i].mesh, clients[i].channel); - meshlink_set_node_status_cb(clients[i].mesh, NULL); + meshlink_channel_close(server, channels[i]); } - assert(wait_sync_flag(&close_flag, 10)); + for(int i = 0; i < 3; i++) { + assert(wait_sync_flag(&clients[i].close_flag, 10)); + } // Check that the clients have received (most of) the data @@ -181,12 +153,16 @@ int main(void) { fprintf(stderr, "%s received %zu\n", clients[i].mesh->name, clients[i].received); } + bool got_large_packet = false; + for(int i = 0; i < 3; i++) { assert(clients[i].received >= 1000000); assert(clients[i].received <= 1345536); - assert(clients[i].got_large_packet); + got_large_packet |= clients[i].got_large_packet; } + assert(got_large_packet); + // Clean up. for(int i = 0; i < 3; i++) { -- 2.39.5