]> git.meshlink.io Git - meshlink/blobdiff - src/utcp.c
Implement MESHLINK_CHANNEL_FRAMED.
[meshlink] / src / utcp.c
index c4305cd4de4978d265155f2dcb537cd746e39251..2a1ac5b86f370ec319b833445a24c918f3bc4402 100644 (file)
@@ -195,6 +195,10 @@ static bool is_reliable(struct utcp_connection *c) {
        return c->flags & UTCP_RELIABLE;
 }
 
+static bool is_framed(struct utcp_connection *c) {
+       return c->flags & UTCP_FRAMED;
+}
+
 static int32_t seqdiff(uint32_t a, uint32_t b) {
        return a - b;
 }
@@ -599,6 +603,19 @@ static void start_retransmit_timer(struct utcp_connection *c) {
        debug(c, "rtrx_timeout %ld.%06lu\n", c->rtrx_timeout.tv_sec, c->rtrx_timeout.tv_nsec);
 }
 
+static void start_flush_timer(struct utcp_connection *c) {
+       clock_gettime(UTCP_CLOCK, &c->rtrx_timeout);
+
+       c->rtrx_timeout.tv_nsec += c->utcp->flush_timeout * 1000000;
+
+       if(c->rtrx_timeout.tv_nsec >= NSEC_PER_SEC) {
+               c->rtrx_timeout.tv_nsec -= NSEC_PER_SEC;
+               c->rtrx_timeout.tv_sec++;
+       }
+
+       debug(c, "rtrx_timeout %ld.%06lu (flush)\n", c->rtrx_timeout.tv_sec, c->rtrx_timeout.tv_nsec);
+}
+
 static void stop_retransmit_timer(struct utcp_connection *c) {
        timespec_clear(&c->rtrx_timeout);
        debug(c, "rtrx_timeout cleared\n");
@@ -735,108 +752,263 @@ static void ack(struct utcp_connection *c, bool sendatleastone) {
        } while(left);
 }
 
-ssize_t utcp_send(struct utcp_connection *c, const void *data, size_t len) {
-       if(c->reapable) {
-               debug(c, "send() called on closed connection\n");
-               errno = EBADF;
-               return -1;
+static ssize_t utcp_send_reliable(struct utcp_connection *c, const void *data, size_t len) {
+       size_t rlen = len + (is_framed(c) ? 2 : 0);
+
+       if(!rlen) {
+               return 0;
        }
 
-       switch(c->state) {
-       case CLOSED:
-       case LISTEN:
-               debug(c, "send() called on unconnected connection\n");
-               errno = ENOTCONN;
-               return -1;
+       // Check if we need to be able to buffer all data
 
-       case SYN_SENT:
-       case SYN_RECEIVED:
-       case ESTABLISHED:
-       case CLOSE_WAIT:
-               break;
+       if(c->flags & (UTCP_NO_PARTIAL | UTCP_FRAMED)) {
+               if(rlen > c->sndbuf.maxsize) {
+                       errno = EMSGSIZE;
+                       return -1;
+               }
 
-       case FIN_WAIT_1:
-       case FIN_WAIT_2:
-       case CLOSING:
-       case LAST_ACK:
-       case TIME_WAIT:
-               debug(c, "send() called on closed connection\n");
-               errno = EPIPE;
-               return -1;
+               if((c->flags & UTCP_FRAMED) && len > MAX_UNRELIABLE_SIZE) {
+                       errno = EMSGSIZE;
+                       return -1;
+               }
+
+               if(rlen > buffer_free(&c->sndbuf)) {
+                       errno = EWOULDBLOCK;
+                       return 0;
+               }
        }
 
-       // Exit early if we have nothing to send.
+       // Add data to the send buffer.
 
-       if(!len) {
-               return 0;
+       if(is_framed(c)) {
+               uint16_t len16 = len;
+               buffer_put(&c->sndbuf, &len16, sizeof(len16));
+               assert(buffer_put(&c->sndbuf, data, len) == (ssize_t)len);
+       } else {
+               len = buffer_put(&c->sndbuf, data, len);
+
+               if(len <= 0) {
+                       errno = EWOULDBLOCK;
+                       return 0;
+               }
        }
 
-       if(!data) {
-               errno = EFAULT;
-               return -1;
+       c->snd.last += rlen;
+
+       // Don't send anything yet if the connection has not fully established yet
+
+       if(c->state == SYN_SENT || c->state == SYN_RECEIVED) {
+               return len;
        }
 
-       // Check if we need to be able to buffer all data
+       ack(c, false);
 
-       if(c->flags & UTCP_NO_PARTIAL) {
-               if(len > buffer_free(&c->sndbuf)) {
-                       if(len > c->sndbuf.maxsize) {
-                               errno = EMSGSIZE;
-                               return -1;
-                       } else {
-                               errno = EWOULDBLOCK;
-                               return 0;
-                       }
-               }
+       if(!timespec_isset(&c->rtrx_timeout)) {
+               start_retransmit_timer(c);
        }
 
-       // Add data to send buffer.
+       if(!timespec_isset(&c->conn_timeout)) {
+               clock_gettime(UTCP_CLOCK, &c->conn_timeout);
+               c->conn_timeout.tv_sec += c->utcp->timeout;
+       }
 
-       if(is_reliable(c)) {
-               len = buffer_put(&c->sndbuf, data, len);
-       } else if(c->state != SYN_SENT && c->state != SYN_RECEIVED) {
-               if(len > MAX_UNRELIABLE_SIZE || buffer_put(&c->sndbuf, data, len) != (ssize_t)len) {
-                       errno = EMSGSIZE;
-                       return -1;
+       return len;
+}
+
+
+/* In the send buffer we can have multiple frames, each prefixed with their
+   length. However, the first frame might already have been partially sent. The
+   variable c->frame_offset tracks how much of a partial frame is left at the
+   start. If it is 0, it means there is no partial frame, and the first two
+   bytes in the send buffer are the length of the first frame.
+
+   After sending an MSS sized packet, we need to calculate the new frame_offset
+   value, since it is likely that the next packet will also have a partial frame
+   at the start. We do this by scanning the previously sent packet for frame
+   headers, to find out how many bytes of the last frame are left to send.
+*/
+static void ack_unreliable_framed(struct utcp_connection *c) {
+       int32_t left = seqdiff(c->snd.last, c->snd.nxt);
+       assert(left > 0);
+
+       struct {
+               struct hdr hdr;
+               uint8_t data[];
+       } *pkt = c->utcp->pkt;
+
+       pkt->hdr.src = c->src;
+       pkt->hdr.dst = c->dst;
+       pkt->hdr.ack = c->rcv.nxt;
+       pkt->hdr.ctl = ACK | MF;
+       pkt->hdr.aux = 0;
+
+       bool sent_packet = false;
+
+       while(left >= c->utcp->mss) {
+               pkt->hdr.wnd = c->frame_offset;
+               uint32_t seglen = c->utcp->mss;
+
+               pkt->hdr.seq = c->snd.nxt;
+
+               buffer_copy(&c->sndbuf, pkt->data, seqdiff(c->snd.nxt, c->snd.una), seglen);
+
+               c->snd.nxt += seglen;
+               c->snd.una = c->snd.nxt;
+               left -= seglen;
+
+               print_packet(c, "send", pkt, sizeof(pkt->hdr) + seglen);
+               c->utcp->send(c->utcp, pkt, sizeof(pkt->hdr) + seglen);
+               sent_packet = true;
+
+               // Calculate the new frame offset
+               while(c->frame_offset < seglen) {
+                       uint16_t framelen;
+                       buffer_copy(&c->sndbuf, &framelen, c->frame_offset, sizeof(framelen));
+                       c->frame_offset += framelen + 2;
                }
-       } else {
-               return 0;
+
+               buffer_discard(&c->sndbuf, seglen);
+               c->frame_offset -= seglen;
+       };
+
+       if(sent_packet) {
+               if(left) {
+                       // We sent one packet but we have partial data left, (re)start the flush timer
+                       start_flush_timer(c);
+               } else {
+                       // There is no partial data in the send buffer, so stop the flush timer
+                       stop_retransmit_timer(c);
+               }
+       }
+}
+
+static void flush_unreliable_framed(struct utcp_connection *c) {
+       int32_t left = seqdiff(c->snd.last, c->snd.nxt);
+
+       /* If the MSS dropped since last time ack_unreliable_frame() was called,
+         we might now have more than one segment worth of data left.
+       */
+       if(left > c->utcp->mss) {
+               ack_unreliable_framed(c);
+               left = seqdiff(c->snd.last, c->snd.nxt);
+               assert(left <= c->utcp->mss);
        }
 
-       if(len <= 0) {
-               if(is_reliable(c)) {
+       if(left) {
+               struct {
+                       struct hdr hdr;
+                       uint8_t data[];
+               } *pkt = c->utcp->pkt;
+
+               pkt->hdr.src = c->src;
+               pkt->hdr.dst = c->dst;
+               pkt->hdr.seq = c->snd.nxt;
+               pkt->hdr.ack = c->rcv.nxt;
+               pkt->hdr.wnd = c->frame_offset;
+               pkt->hdr.ctl = ACK | MF;
+               pkt->hdr.aux = 0;
+
+               uint32_t seglen = left;
+
+               buffer_copy(&c->sndbuf, pkt->data, seqdiff(c->snd.nxt, c->snd.una), seglen);
+               buffer_discard(&c->sndbuf, seglen);
+
+               c->snd.nxt += seglen;
+               c->snd.una = c->snd.nxt;
+
+               print_packet(c, "send", pkt, sizeof(pkt->hdr) + seglen);
+               c->utcp->send(c->utcp, pkt, sizeof(pkt->hdr) + seglen);
+       }
+
+       c->frame_offset = 0;
+       stop_retransmit_timer(c);
+}
+
+
+static ssize_t utcp_send_unreliable(struct utcp_connection *c, const void *data, size_t len) {
+       if(len > MAX_UNRELIABLE_SIZE) {
+               errno = EMSGSIZE;
+               return -1;
+       }
+
+       size_t rlen = len + (is_framed(c) ? 2 : 0);
+
+       if(rlen > buffer_free(&c->sndbuf)) {
+               if(rlen > c->sndbuf.maxsize) {
+                       errno = EMSGSIZE;
+                       return -1;
+               } else {
                        errno = EWOULDBLOCK;
                        return 0;
-               } else {
-                       return len;
                }
        }
 
-       c->snd.last += len;
-
        // Don't send anything yet if the connection has not fully established yet
 
        if(c->state == SYN_SENT || c->state == SYN_RECEIVED) {
                return len;
        }
 
-       ack(c, false);
+       if(is_framed(c)) {
+               uint16_t framelen = len;
+               buffer_put(&c->sndbuf, &framelen, sizeof(framelen));
+       }
 
-       if(!is_reliable(c)) {
+       buffer_put(&c->sndbuf, data, len);
+
+       c->snd.last += rlen;
+
+       if(is_framed(c)) {
+               ack_unreliable_framed(c);
+       } else {
+               ack(c, false);
                c->snd.una = c->snd.nxt = c->snd.last;
                buffer_discard(&c->sndbuf, c->sndbuf.used);
        }
 
-       if(is_reliable(c) && !timespec_isset(&c->rtrx_timeout)) {
-               start_retransmit_timer(c);
+       return len;
+}
+
+ssize_t utcp_send(struct utcp_connection *c, const void *data, size_t len) {
+       if(c->reapable) {
+               debug(c, "send() called on closed connection\n");
+               errno = EBADF;
+               return -1;
        }
 
-       if(is_reliable(c) && !timespec_isset(&c->conn_timeout)) {
-               clock_gettime(UTCP_CLOCK, &c->conn_timeout);
-               c->conn_timeout.tv_sec += c->utcp->timeout;
+       switch(c->state) {
+       case CLOSED:
+       case LISTEN:
+               debug(c, "send() called on unconnected connection\n");
+               errno = ENOTCONN;
+               return -1;
+
+       case SYN_SENT:
+       case SYN_RECEIVED:
+       case ESTABLISHED:
+       case CLOSE_WAIT:
+               break;
+
+       case FIN_WAIT_1:
+       case FIN_WAIT_2:
+       case CLOSING:
+       case LAST_ACK:
+       case TIME_WAIT:
+               debug(c, "send() called on closed connection\n");
+               errno = EPIPE;
+               return -1;
        }
 
-       return len;
+       if(!data && len) {
+               errno = EFAULT;
+               return -1;
+       }
+
+       if(is_reliable(c)) {
+               return utcp_send_reliable(c, data, len);
+       } else {
+               return utcp_send_unreliable(c, data, len);
+       }
 }
 
 static void swap_ports(struct hdr *hdr) {
@@ -899,7 +1071,7 @@ static void retransmit(struct utcp_connection *c) {
 
        struct utcp *utcp = c->utcp;
 
-       if(utcp->retransmit) {
+       if(utcp->retransmit && is_reliable(c)) {
                utcp->retransmit(c);
        }
 
@@ -942,6 +1114,11 @@ static void retransmit(struct utcp_connection *c) {
        case CLOSE_WAIT:
        case CLOSING:
        case LAST_ACK:
+               if(!is_reliable(c) && is_framed(c) && c->sndbuf.used) {
+                       flush_unreliable_framed(c);
+                       return;
+               }
+
                // Send unacked data again.
                pkt->hdr.seq = c->snd.una;
                pkt->hdr.ack = c->rcv.nxt;
@@ -1100,6 +1277,13 @@ static void handle_out_of_order(struct utcp_connection *c, uint32_t offset, cons
        }
 }
 
+static void handle_out_of_order_framed(struct utcp_connection *c, uint32_t offset, const void *data, size_t len) {
+       uint32_t in_order_offset = (c->sacks[0].len && c->sacks[0].offset == 0) ? c->sacks[0].len : 0;
+
+       // Put the data into the receive buffer
+       handle_out_of_order(c, offset + in_order_offset, data, len);
+}
+
 static void handle_in_order(struct utcp_connection *c, const void *data, size_t len) {
        if(c->recv) {
                ssize_t rxd = c->recv(c, data, len);
@@ -1135,6 +1319,50 @@ static void handle_in_order(struct utcp_connection *c, const void *data, size_t
        c->rcv.nxt += len;
 }
 
+static void handle_in_order_framed(struct utcp_connection *c, const void *data, size_t len) {
+       // Treat it as out of order, since it is unlikely the start of this packet contains the start of a frame.
+       uint32_t in_order_offset = (c->sacks[0].len && c->sacks[0].offset == 0) ? c->sacks[0].len : 0;
+       handle_out_of_order(c, in_order_offset, data, len);
+
+       // While we have full frames at the start, give them to the application
+       while(c->sacks[0].len >= 2 && c->sacks[0].offset == 0) {
+               uint16_t framelen;
+               buffer_copy(&c->rcvbuf, &framelen, 0, sizeof(&framelen));
+
+               if(framelen > c->sacks[0].len - 2) {
+                       break;
+               }
+
+               if(c->recv) {
+                       ssize_t rxd;
+                       uint32_t realoffset = c->rcvbuf.offset + 2;
+
+                       if(c->rcvbuf.size - c->rcvbuf.offset <= 2) {
+                               realoffset -= c->rcvbuf.size;
+                       }
+
+                       if(realoffset > c->rcvbuf.size - framelen) {
+                               // The buffer wraps, we need to copy
+                               uint8_t buf[framelen];
+                               buffer_copy(&c->rcvbuf, buf, 2, framelen);
+                               rxd = c->recv(c, buf, framelen);
+                       } else {
+                               // The frame is contiguous in the receive buffer
+                               rxd = c->recv(c, c->rcvbuf.data + realoffset, framelen);
+                       }
+
+                       if(rxd != (ssize_t)framelen) {
+                               // TODO: handle the application not accepting all data.
+                               abort();
+                       }
+               }
+
+               sack_consume(c, framelen + 2);
+       }
+
+       c->rcv.nxt += len;
+}
+
 static void handle_unreliable(struct utcp_connection *c, const struct hdr *hdr, const void *data, size_t len) {
        // Fast path for unfragmented packets
        if(!hdr->wnd && !(hdr->ctl & MF)) {
@@ -1175,18 +1403,87 @@ static void handle_unreliable(struct utcp_connection *c, const struct hdr *hdr,
        c->rcv.nxt = hdr->seq + len;
 }
 
+static void handle_unreliable_framed(struct utcp_connection *c, const struct hdr *hdr, const void *data, size_t len) {
+       bool in_order = hdr->seq == c->rcv.nxt;
+       c->rcv.nxt = hdr->seq + len;
+
+       const uint8_t *ptr = data;
+       size_t left = len;
+
+       // Does it start with a partial frame?
+       if(hdr->wnd) {
+               // Only accept the data if it is in order
+               if(in_order && c->rcvbuf.used) {
+                       // In order, append it to the receive buffer
+                       buffer_put(&c->rcvbuf, data, min(hdr->wnd, len));
+
+                       if(hdr->wnd <= len) {
+                               // We have a full frame
+                               c->recv(c, (uint8_t *)c->rcvbuf.data + 2, c->rcvbuf.used - 2);
+                       }
+               }
+
+               // Exit early if there is other data in this frame
+               if(hdr->wnd > len) {
+                       if(!in_order) {
+                               buffer_clear(&c->rcvbuf);
+                       }
+
+                       return;
+               }
+
+               ptr += hdr->wnd;
+               left -= hdr->wnd;
+       }
+
+       // We now start with new frames, so clear any data in the receive buffer
+       buffer_clear(&c->rcvbuf);
+
+       // Handle whole frames
+       while(left > 2) {
+               uint16_t framelen;
+               memcpy(&framelen, ptr, sizeof(framelen));
+
+               if(left <= (size_t)framelen + 2) {
+                       break;
+               }
+
+               c->recv(c, ptr + 2, framelen);
+               ptr += framelen + 2;
+               left -= framelen + 2;
+       }
+
+       // Handle partial last frame
+       if(left) {
+               buffer_put(&c->rcvbuf, ptr, left);
+       }
+}
+
 static void handle_incoming_data(struct utcp_connection *c, const struct hdr *hdr, const void *data, size_t len) {
        if(!is_reliable(c)) {
-               handle_unreliable(c, hdr, data, len);
+               if(is_framed(c)) {
+                       handle_unreliable_framed(c, hdr, data, len);
+               } else {
+                       handle_unreliable(c, hdr, data, len);
+               }
+
                return;
        }
 
        uint32_t offset = seqdiff(hdr->seq, c->rcv.nxt);
 
-       if(offset) {
-               handle_out_of_order(c, offset, data, len);
+       if(is_framed(c)) {
+               if(offset) {
+                       handle_out_of_order_framed(c, offset, data, len);
+               } else {
+                       handle_in_order_framed(c, data, len);
+               }
        } else {
-               handle_in_order(c, data, len);
+               if(offset) {
+                       handle_out_of_order(c, offset, data, len);
+               } else {
+                       handle_in_order(c, data, len);
+               }
        }
 }
 
@@ -1447,7 +1744,7 @@ synack:
                int32_t rcv_offset = seqdiff(hdr.seq, c->rcv.nxt);
 
                if(rcv_offset) {
-                       debug(c, "packet out of order, offset %u bytes", rcv_offset);
+                       debug(c, "packet out of order, offset %u bytes\n", rcv_offset);
                }
 
 #endif
@@ -1938,6 +2235,10 @@ int utcp_shutdown(struct utcp_connection *c, int dir) {
 
        case SYN_RECEIVED:
        case ESTABLISHED:
+               if(!is_reliable(c) && is_framed(c)) {
+                       flush_unreliable_framed(c);
+               }
+
                set_state(c, FIN_WAIT_1);
                break;
 
@@ -1957,7 +2258,7 @@ int utcp_shutdown(struct utcp_connection *c, int dir) {
 
        c->snd.last++;
 
-       ack(c, false);
+       ack(c, !is_reliable(c));
 
        if(!timespec_isset(&c->rtrx_timeout)) {
                start_retransmit_timer(c);
@@ -2052,8 +2353,10 @@ void utcp_abort_all_connections(struct utcp *utcp) {
 }
 
 int utcp_close(struct utcp_connection *c) {
+       debug(c, "closing\n");
+
        if(c->rcvbuf.used) {
-               fprintf(stderr, "UTCP channel closed with stuff in receive buffer\n");
+               debug(c, "receive buffer not empty, resetting\n");
                return reset_connection(c) ? 0 : -1;
        }
 
@@ -2474,3 +2777,11 @@ void utcp_set_retransmit_cb(struct utcp *utcp, utcp_retransmit_t cb) {
 void utcp_set_clock_granularity(long granularity) {
        CLOCK_GRANULARITY = granularity;
 }
+
+int utcp_get_flush_timeout(struct utcp *utcp) {
+       return utcp->flush_timeout;
+}
+
+void utcp_set_flush_timeout(struct utcp *utcp, int milliseconds) {
+       utcp->flush_timeout = milliseconds;
+}