From: Guus Sliepen Date: Sun, 18 Oct 2015 11:53:20 +0000 (+0200) Subject: Add a receive buffer. X-Git-Url: https://git.meshlink.io/?a=commitdiff_plain;h=fb2d3fbb1fe0538ef0f137ed88322f5b3de4707f;p=utcp Add a receive buffer. The receive buffer kicks in the moment we get a packet which is out of order. We store the packet in the buffer, and keep track of up to 4 ranges of bytes of received data. When retransmission fills the first gap, we send all the buffered data (up to the second gap if applicable) to the application. 4 byte ranges seems to be a good value for up to moderate (20%) packet loss. This algorithm greatly reduces the amount of useless packets being sent. A future improvement is sending the SACK information in the ACK packets, so the congestion window can be kept large while avoiding packets being resent unnecessarily. --- diff --git a/utcp.c b/utcp.c index 9434c70..2b2db97 100644 --- a/utcp.c +++ b/utcp.c @@ -53,6 +53,10 @@ } while (0) #endif +#ifndef max +#define max(a, b) ((a) > (b) ? (a) : (b)) +#endif + #ifdef UTCP_DEBUG #include @@ -300,6 +304,11 @@ static struct utcp_connection *allocate_connection(struct utcp *utcp, uint16_t s return NULL; } + if(!buffer_init(&c->rcvbuf, DEFAULT_RCVBUFSIZE, DEFAULT_MAXRCVBUFSIZE)) { + free(c); + return NULL; + } + // Fill in the details c->src = src; @@ -369,6 +378,7 @@ void utcp_accept(struct utcp_connection *c, utcp_recv_t recv, void *priv) { static void ack(struct utcp_connection *c, bool sendatleastone) { int32_t left = seqdiff(c->snd.last, c->snd.nxt); int32_t cwndleft = c->snd.cwnd - seqdiff(c->snd.nxt, c->snd.una); + debug("cwndleft = %d\n", cwndleft); assert(left >= 0); @@ -528,6 +538,7 @@ static void retransmit(struct utcp_connection *c) { pkt->hdr.ctl |= FIN; } c->snd.nxt = c->snd.una + len; + c->snd.cwnd = utcp->mtu; // reduce cwnd on retransmit buffer_copy(&c->sndbuf, pkt->data, 0, len); print_packet(c->utcp, "rtrx", pkt, sizeof pkt->hdr + len); utcp->send(utcp, pkt, sizeof pkt->hdr + len); @@ -548,6 +559,114 @@ static void retransmit(struct utcp_connection *c) { free(pkt); } +// Update receive buffer and SACK entries after consuming data. +static void sack_consume(struct utcp_connection *c, size_t len) { + debug("sack_consume %zu\n", len); + if(len > c->rcvbuf.used) + abort(); + + buffer_get(&c->rcvbuf, NULL, len); + + for(int i = 0; i < NSACKS && c->sacks[i].len; ) { + if(len < c->sacks[i].offset) { + c->sacks[i].offset -= len; + i++; + } else if(len < c->sacks[i].offset + c->sacks[i].len) { + c->sacks[i].offset = 0; + c->sacks[i].len -= len - c->sacks[i].offset; + i++; + } else { + if(i < NSACKS - 1) { + memmove(&c->sacks[i], &c->sacks[i + 1], (NSACKS - 1 - i) * sizeof c->sacks[i]); + c->sacks[i + 1].len = 0; + } else { + c->sacks[i].len = 0; + break; + } + } + } + + for(int i = 0; i < NSACKS && c->sacks[i].len; i++) + debug("SACK[%d] offset %u len %u\n", i, c->sacks[i].offset, c->sacks[i].len); +} + +static void handle_out_of_order(struct utcp_connection *c, uint32_t offset, const void *data, size_t len) { + debug("out of order packet, offset %u\n", offset); + // Packet loss or reordering occured. Store the data in the buffer. + ssize_t rxd = buffer_put_at(&c->rcvbuf, offset, data, len); + if(rxd < len) + abort(); + + // Make note of where we put it. + for(int i = 0; i < NSACKS; i++) { + if(!c->sacks[i].len) { // nothing to merge, add new entry + debug("New SACK entry %d\n", i); + c->sacks[i].offset = offset; + c->sacks[i].len = rxd; + break; + } else if(offset < c->sacks[i].offset) { + if(offset + rxd < c->sacks[i].offset) { // insert before + if(!c->sacks[NSACKS - 1].len) { // only if room left + debug("Insert SACK entry at %d\n", i); + memmove(&c->sacks[i + 1], &c->sacks[i], (NSACKS - i - 1) * sizeof c->sacks[i]); + c->sacks[i].offset = offset; + c->sacks[i].len = rxd; + } + break; + } else { // merge + debug("Merge with start of SACK entry at %d\n", i); + c->sacks[i].offset = offset; + break; + } + } else if(offset <= c->sacks[i].offset + c->sacks[i].len) { + if(offset + rxd > c->sacks[i].offset + c->sacks[i].len) { // merge + debug("Merge with end of SACK entry at %d\n", i); + c->sacks[i].len = offset + rxd - c->sacks[i].offset; + // TODO: handle potential merge with next entry + } + break; + } + } + + for(int i = 0; i < NSACKS && c->sacks[i].len; i++) + debug("SACK[%d] offset %u len %u\n", i, c->sacks[i].offset, c->sacks[i].len); +} + +static void handle_in_order(struct utcp_connection *c, const void *data, size_t len) { + // Check if we can process out-of-order data now. + if(c->sacks[0].len && len >= c->sacks[0].offset) { // TODO: handle overlap with second SACK + debug("incoming packet len %zu connected with SACK at %u\n", len, c->sacks[0].offset); + buffer_put_at(&c->rcvbuf, 0, data, len); // TODO: handle return value + len = max(len, c->sacks[0].offset + c->sacks[0].len); + data = c->rcvbuf.data; + } + + if(c->recv) { + ssize_t rxd = c->recv(c, data, len); + if(rxd != len) { + // TODO: handle the application not accepting all data. + abort(); + } + } + + if(c->rcvbuf.used) + sack_consume(c, len); + + c->rcv.nxt += len; +} + + +static void handle_incoming_data(struct utcp_connection *c, uint32_t seq, const void *data, size_t len) { + uint32_t offset = seqdiff(seq, c->rcv.nxt); + if(offset + len > c->rcvbuf.maxsize) + abort(); + + if(offset) + handle_out_of_order(c, offset, data, len); + else + handle_in_order(c, data, len); +} + ssize_t utcp_recv(struct utcp *utcp, const void *data, size_t len) { if(!utcp) { @@ -678,28 +797,19 @@ ssize_t utcp_recv(struct utcp *utcp, const void *data, size_t len) { acceptable = true; // TODO: handle packets overlapping c->rcv.nxt. -#if 0 +#if 1 // Only use this when accepting out-of-order packets. else if(len == 0) - if(c->rcv.wnd == 0) - acceptable = hdr.seq == c->rcv.nxt; - else - acceptable = (seqdiff(hdr.seq, c->rcv.nxt) >= 0 && seqdiff(hdr.seq, c->rcv.nxt + c->rcv.wnd) < 0); + acceptable = seqdiff(hdr.seq, c->rcv.nxt) >= 0; else - if(c->rcv.wnd == 0) - // We don't accept data when the receive window is zero. - acceptable = false; - else - // Both start and end of packet must be within the receive window - acceptable = (seqdiff(hdr.seq, c->rcv.nxt) >= 0 && seqdiff(hdr.seq, c->rcv.nxt + c->rcv.wnd) < 0) - || (seqdiff(hdr.seq + len + 1, c->rcv.nxt) >= 0 && seqdiff(hdr.seq + len - 1, c->rcv.nxt + c->rcv.wnd) < 0); + acceptable = seqdiff(hdr.seq, c->rcv.nxt) >= 0 && seqdiff(hdr.seq, c->rcv.nxt) + len <= c->rcvbuf.maxsize; #else if(c->state != SYN_SENT) acceptable = hdr.seq == c->rcv.nxt; #endif if(!acceptable) { - debug("Packet not acceptable, %u <= %u + %zu < %u\n", c->rcv.nxt, hdr.seq, len, c->rcv.nxt + c->rcv.wnd); + debug("Packet not acceptable, %u <= %u + %zu < %u\n", c->rcv.nxt, hdr.seq, len, c->rcv.nxt + c->rcvbuf.maxsize); // Ignore unacceptable RST packets. if(hdr.ctl & RST) return 0; @@ -711,10 +821,14 @@ ssize_t utcp_recv(struct utcp *utcp, const void *data, size_t len) { c->snd.wnd = hdr.wnd; // TODO: move below // 1c. Drop packets with an invalid ACK. - // ackno should not roll back, and it should also not be bigger than snd.nxt. - - if(hdr.ctl & ACK && (seqdiff(hdr.ack, c->snd.nxt) > 0 || seqdiff(hdr.ack, c->snd.una) < 0)) { - debug("Packet ack seqno out of range, %u %u %u\n", hdr.ack, c->snd.una, c->snd.nxt); + // ackno should not roll back, and it should also not be bigger than what we ever could have sent + // (= snd.una + c->sndbuf.used). + + if(hdr.ctl & ACK && + ((seqdiff(hdr.ack, c->snd.una + c->sndbuf.used) > 0 && + seqdiff(hdr.ack, c->snd.nxt) > 0) // TODO: simplify this if + || seqdiff(hdr.ack, c->snd.una) < 0)) { + debug("Packet ack seqno out of range, %u <= %u < %u\n", c->snd.una, hdr.ack, c->snd.una + c->sndbuf.used); // Ignore unacceptable RST packets. if(hdr.ctl & RST) return 0; @@ -800,6 +914,10 @@ ssize_t utcp_recv(struct utcp *utcp, const void *data, size_t len) { if(data_acked) buffer_get(&c->sndbuf, NULL, data_acked); + // Also advance snd.nxt if possible + if(seqdiff(c->snd.nxt, hdr.ack) < 0) + c->snd.nxt = hdr.ack; + c->snd.una = hdr.ack; c->dupack = 0; @@ -830,9 +948,9 @@ ssize_t utcp_recv(struct utcp *utcp, const void *data, size_t len) { debug("Triplicate ACK\n"); //TODO: Resend one packet and go to fast recovery mode. See RFC 6582. //We do a very simple variant here; reset the nxt pointer to the last acknowledged packet from the peer. - //This will cause us to start retransmitting, but at the same speed as the incoming ACKs arrive, - //thus preventing a drop in speed. + //Reset the congestion window so we wait for ACKs. c->snd.nxt = c->snd.una; + c->snd.cwnd = utcp->mtu; } } } @@ -923,28 +1041,12 @@ ssize_t utcp_recv(struct utcp *utcp, const void *data, size_t len) { return 0; } - ssize_t rxd; - - if(c->recv) { - rxd = c->recv(c, data, len); - if(rxd != len) { - // TODO: once we have a receive buffer, handle the application not accepting all data. - abort(); - } - if(rxd < 0) - rxd = 0; - else if(rxd > len) - rxd = len; // Bad application, bad! - } else { - rxd = len; - } - - c->rcv.nxt += len; + handle_incoming_data(c, hdr.seq, data, len); } // 7. Process FIN stuff - if(hdr.ctl & FIN) { + if((hdr.ctl & FIN) && hdr.seq + len == c->rcv.nxt) { switch(c->state) { case SYN_SENT: case SYN_RECEIVED: @@ -1277,6 +1379,25 @@ void utcp_set_sndbuf(struct utcp_connection *c, size_t size) { c->sndbuf.maxsize = -1; } +size_t utcp_get_rcvbuf(struct utcp_connection *c) { + return c ? c->rcvbuf.maxsize : 0; +} + +size_t utcp_get_rcvbuf_free(struct utcp_connection *c) { + if(c && (c->state == ESTABLISHED || c->state == CLOSE_WAIT)) + return buffer_free(&c->rcvbuf); + else + return 0; +} + +void utcp_set_rcvbuf(struct utcp_connection *c, size_t size) { + if(!c) + return; + c->rcvbuf.maxsize = size; + if(c->rcvbuf.maxsize != size) + c->rcvbuf.maxsize = -1; +} + bool utcp_get_nodelay(struct utcp_connection *c) { return c ? c->nodelay : false; } diff --git a/utcp.h b/utcp.h index a29ef96..334e0bf 100644 --- a/utcp.h +++ b/utcp.h @@ -80,9 +80,12 @@ extern void utcp_set_mtu(struct utcp *utcp, uint16_t mtu); extern size_t utcp_get_sndbuf(struct utcp_connection *connection); extern void utcp_set_sndbuf(struct utcp_connection *connection, size_t size); - extern size_t utcp_get_sndbuf_free(struct utcp_connection *connection); +extern size_t utcp_get_rcvbuf(struct utcp_connection *connection); +extern void utcp_set_rcvbuf(struct utcp_connection *connection, size_t size); +extern size_t utcp_get_rcvbuf_free(struct utcp_connection *connection); + extern bool utcp_get_nodelay(struct utcp_connection *connection); extern void utcp_set_nodelay(struct utcp_connection *connection, bool nodelay); diff --git a/utcp_priv.h b/utcp_priv.h index b997953..69f1b0d 100644 --- a/utcp_priv.h +++ b/utcp_priv.h @@ -30,8 +30,11 @@ #define FIN 4 #define RST 8 +#define NSACKS 4 #define DEFAULT_SNDBUFSIZE 4096 #define DEFAULT_MAXSNDBUFSIZE 131072 +#define DEFAULT_RCVBUFSIZE 0 +#define DEFAULT_MAXRCVBUFSIZE 131072 struct hdr { uint16_t src; // Source port @@ -78,6 +81,11 @@ struct buffer { uint32_t maxsize; }; +struct sack { + uint32_t offset; + uint32_t len; +}; + struct utcp_connection { void *priv; struct utcp *utcp; @@ -118,9 +126,11 @@ struct utcp_connection { struct timeval conn_timeout; struct timeval rtrx_timeout; - // Send buffer + // Buffers struct buffer sndbuf; + struct buffer rcvbuf; + struct sack sacks[NSACKS]; // Per-socket options