X-Git-Url: http://git.meshlink.io/?a=blobdiff_plain;f=utcp.c;h=ad19982ba95afdb613eb780e6ac8d8eff6511ee3;hb=7d8a65c11dd4b0555575ff75efab859236c4964f;hp=e6f6cf9ede28360df84fad0161b0a993e01a3c0f;hpb=ea89f59bebc6529e3270eebcc86825ee1eb7055e;p=utcp diff --git a/utcp.c b/utcp.c index e6f6cf9..ad19982 100644 --- a/utcp.c +++ b/utcp.c @@ -53,6 +53,10 @@ } while (0) #endif +#ifndef max +#define max(a, b) ((a) > (b) ? (a) : (b)) +#endif + #ifdef UTCP_DEBUG #include @@ -197,9 +201,11 @@ static ssize_t buffer_copy(struct buffer *buf, void *data, size_t offset, size_t static bool buffer_init(struct buffer *buf, uint32_t len, uint32_t maxlen) { memset(buf, 0, sizeof *buf); - buf->data = malloc(len); - if(!len) - return false; + if(len) { + buf->data = malloc(len); + if(!buf->data) + return false; + } buf->size = len; buf->maxsize = maxlen; return true; @@ -298,6 +304,11 @@ static struct utcp_connection *allocate_connection(struct utcp *utcp, uint16_t s return NULL; } + if(!buffer_init(&c->rcvbuf, DEFAULT_RCVBUFSIZE, DEFAULT_MAXRCVBUFSIZE)) { + free(c); + return NULL; + } + // Fill in the details c->src = src; @@ -367,6 +378,7 @@ void utcp_accept(struct utcp_connection *c, utcp_recv_t recv, void *priv) { static void ack(struct utcp_connection *c, bool sendatleastone) { int32_t left = seqdiff(c->snd.last, c->snd.nxt); int32_t cwndleft = c->snd.cwnd - seqdiff(c->snd.nxt, c->snd.una); + debug("cwndleft = %d\n", cwndleft); assert(left >= 0); @@ -526,6 +538,7 @@ static void retransmit(struct utcp_connection *c) { pkt->hdr.ctl |= FIN; } c->snd.nxt = c->snd.una + len; + c->snd.cwnd = utcp->mtu; // reduce cwnd on retransmit buffer_copy(&c->sndbuf, pkt->data, 0, len); print_packet(c->utcp, "rtrx", pkt, sizeof pkt->hdr + len); utcp->send(utcp, pkt, sizeof pkt->hdr + len); @@ -546,6 +559,114 @@ static void retransmit(struct utcp_connection *c) { free(pkt); } +// Update receive buffer and SACK entries after consuming data. +static void sack_consume(struct utcp_connection *c, size_t len) { + debug("sack_consume %zu\n", len); + if(len > c->rcvbuf.used) + abort(); + + buffer_get(&c->rcvbuf, NULL, len); + + for(int i = 0; i < NSACKS && c->sacks[i].len; ) { + if(len < c->sacks[i].offset) { + c->sacks[i].offset -= len; + i++; + } else if(len < c->sacks[i].offset + c->sacks[i].len) { + c->sacks[i].offset = 0; + c->sacks[i].len -= len - c->sacks[i].offset; + i++; + } else { + if(i < NSACKS - 1) { + memmove(&c->sacks[i], &c->sacks[i + 1], (NSACKS - 1 - i) * sizeof c->sacks[i]); + c->sacks[i + 1].len = 0; + } else { + c->sacks[i].len = 0; + break; + } + } + } + + for(int i = 0; i < NSACKS && c->sacks[i].len; i++) + debug("SACK[%d] offset %u len %u\n", i, c->sacks[i].offset, c->sacks[i].len); +} + +static void handle_out_of_order(struct utcp_connection *c, uint32_t offset, const void *data, size_t len) { + debug("out of order packet, offset %u\n", offset); + // Packet loss or reordering occured. Store the data in the buffer. + ssize_t rxd = buffer_put_at(&c->rcvbuf, offset, data, len); + if(rxd < len) + abort(); + + // Make note of where we put it. + for(int i = 0; i < NSACKS; i++) { + if(!c->sacks[i].len) { // nothing to merge, add new entry + debug("New SACK entry %d\n", i); + c->sacks[i].offset = offset; + c->sacks[i].len = rxd; + break; + } else if(offset < c->sacks[i].offset) { + if(offset + rxd < c->sacks[i].offset) { // insert before + if(!c->sacks[NSACKS - 1].len) { // only if room left + debug("Insert SACK entry at %d\n", i); + memmove(&c->sacks[i + 1], &c->sacks[i], (NSACKS - i - 1) * sizeof c->sacks[i]); + c->sacks[i].offset = offset; + c->sacks[i].len = rxd; + } + break; + } else { // merge + debug("Merge with start of SACK entry at %d\n", i); + c->sacks[i].offset = offset; + break; + } + } else if(offset <= c->sacks[i].offset + c->sacks[i].len) { + if(offset + rxd > c->sacks[i].offset + c->sacks[i].len) { // merge + debug("Merge with end of SACK entry at %d\n", i); + c->sacks[i].len = offset + rxd - c->sacks[i].offset; + // TODO: handle potential merge with next entry + } + break; + } + } + + for(int i = 0; i < NSACKS && c->sacks[i].len; i++) + debug("SACK[%d] offset %u len %u\n", i, c->sacks[i].offset, c->sacks[i].len); +} + +static void handle_in_order(struct utcp_connection *c, const void *data, size_t len) { + // Check if we can process out-of-order data now. + if(c->sacks[0].len && len >= c->sacks[0].offset) { // TODO: handle overlap with second SACK + debug("incoming packet len %zu connected with SACK at %u\n", len, c->sacks[0].offset); + buffer_put_at(&c->rcvbuf, 0, data, len); // TODO: handle return value + len = max(len, c->sacks[0].offset + c->sacks[0].len); + data = c->rcvbuf.data; + } + + if(c->recv) { + ssize_t rxd = c->recv(c, data, len); + if(rxd != len) { + // TODO: handle the application not accepting all data. + abort(); + } + } + + if(c->rcvbuf.used) + sack_consume(c, len); + + c->rcv.nxt += len; +} + + +static void handle_incoming_data(struct utcp_connection *c, uint32_t seq, const void *data, size_t len) { + uint32_t offset = seqdiff(seq, c->rcv.nxt); + if(offset + len > c->rcvbuf.maxsize) + abort(); + + if(offset) + handle_out_of_order(c, offset, data, len); + else + handle_in_order(c, data, len); +} + ssize_t utcp_recv(struct utcp *utcp, const void *data, size_t len) { if(!utcp) { @@ -676,28 +797,19 @@ ssize_t utcp_recv(struct utcp *utcp, const void *data, size_t len) { acceptable = true; // TODO: handle packets overlapping c->rcv.nxt. -#if 0 +#if 1 // Only use this when accepting out-of-order packets. else if(len == 0) - if(c->rcv.wnd == 0) - acceptable = hdr.seq == c->rcv.nxt; - else - acceptable = (seqdiff(hdr.seq, c->rcv.nxt) >= 0 && seqdiff(hdr.seq, c->rcv.nxt + c->rcv.wnd) < 0); + acceptable = seqdiff(hdr.seq, c->rcv.nxt) >= 0; else - if(c->rcv.wnd == 0) - // We don't accept data when the receive window is zero. - acceptable = false; - else - // Both start and end of packet must be within the receive window - acceptable = (seqdiff(hdr.seq, c->rcv.nxt) >= 0 && seqdiff(hdr.seq, c->rcv.nxt + c->rcv.wnd) < 0) - || (seqdiff(hdr.seq + len + 1, c->rcv.nxt) >= 0 && seqdiff(hdr.seq + len - 1, c->rcv.nxt + c->rcv.wnd) < 0); + acceptable = seqdiff(hdr.seq, c->rcv.nxt) >= 0 && seqdiff(hdr.seq, c->rcv.nxt) + len <= c->rcvbuf.maxsize; #else if(c->state != SYN_SENT) acceptable = hdr.seq == c->rcv.nxt; #endif if(!acceptable) { - debug("Packet not acceptable, %u <= %u + %zu < %u\n", c->rcv.nxt, hdr.seq, len, c->rcv.nxt + c->rcv.wnd); + debug("Packet not acceptable, %u <= %u + %zu < %u\n", c->rcv.nxt, hdr.seq, len, c->rcv.nxt + c->rcvbuf.maxsize); // Ignore unacceptable RST packets. if(hdr.ctl & RST) return 0; @@ -709,10 +821,14 @@ ssize_t utcp_recv(struct utcp *utcp, const void *data, size_t len) { c->snd.wnd = hdr.wnd; // TODO: move below // 1c. Drop packets with an invalid ACK. - // ackno should not roll back, and it should also not be bigger than snd.nxt. - - if(hdr.ctl & ACK && (seqdiff(hdr.ack, c->snd.nxt) > 0 || seqdiff(hdr.ack, c->snd.una) < 0)) { - debug("Packet ack seqno out of range, %u %u %u\n", hdr.ack, c->snd.una, c->snd.nxt); + // ackno should not roll back, and it should also not be bigger than what we ever could have sent + // (= snd.una + c->sndbuf.used). + + if(hdr.ctl & ACK && + ((seqdiff(hdr.ack, c->snd.una + c->sndbuf.used) > 0 && + seqdiff(hdr.ack, c->snd.nxt) > 0) // TODO: simplify this if + || seqdiff(hdr.ack, c->snd.una) < 0)) { + debug("Packet ack seqno out of range, %u <= %u < %u\n", c->snd.una, hdr.ack, c->snd.una + c->sndbuf.used); // Ignore unacceptable RST packets. if(hdr.ctl & RST) return 0; @@ -798,6 +914,10 @@ ssize_t utcp_recv(struct utcp *utcp, const void *data, size_t len) { if(data_acked) buffer_get(&c->sndbuf, NULL, data_acked); + // Also advance snd.nxt if possible + if(seqdiff(c->snd.nxt, hdr.ack) < 0) + c->snd.nxt = hdr.ack; + c->snd.una = hdr.ack; c->dupack = 0; @@ -828,9 +948,9 @@ ssize_t utcp_recv(struct utcp *utcp, const void *data, size_t len) { debug("Triplicate ACK\n"); //TODO: Resend one packet and go to fast recovery mode. See RFC 6582. //We do a very simple variant here; reset the nxt pointer to the last acknowledged packet from the peer. - //This will cause us to start retransmitting, but at the same speed as the incoming ACKs arrive, - //thus preventing a drop in speed. + //Reset the congestion window so we wait for ACKs. c->snd.nxt = c->snd.una; + c->snd.cwnd = utcp->mtu; } } } @@ -921,28 +1041,12 @@ ssize_t utcp_recv(struct utcp *utcp, const void *data, size_t len) { return 0; } - ssize_t rxd; - - if(c->recv) { - rxd = c->recv(c, data, len); - if(rxd != len) { - // TODO: once we have a receive buffer, handle the application not accepting all data. - abort(); - } - if(rxd < 0) - rxd = 0; - else if(rxd > len) - rxd = len; // Bad application, bad! - } else { - rxd = len; - } - - c->rcv.nxt += len; + handle_incoming_data(c, hdr.seq, data, len); } // 7. Process FIN stuff - if(hdr.ctl & FIN) { + if((hdr.ctl & FIN) && hdr.seq + len == c->rcv.nxt) { switch(c->state) { case SYN_SENT: case SYN_RECEIVED: @@ -1205,15 +1309,15 @@ bool utcp_is_active(struct utcp *utcp) { } struct utcp *utcp_init(utcp_accept_t accept, utcp_pre_accept_t pre_accept, utcp_send_t send, void *priv) { - struct utcp *utcp = calloc(1, sizeof *utcp); - if(!utcp) - return NULL; - if(!send) { errno = EFAULT; return NULL; } + struct utcp *utcp = calloc(1, sizeof *utcp); + if(!utcp) + return NULL; + utcp->accept = accept; utcp->pre_accept = pre_accept; utcp->send = send; @@ -1275,6 +1379,25 @@ void utcp_set_sndbuf(struct utcp_connection *c, size_t size) { c->sndbuf.maxsize = -1; } +size_t utcp_get_rcvbuf(struct utcp_connection *c) { + return c ? c->rcvbuf.maxsize : 0; +} + +size_t utcp_get_rcvbuf_free(struct utcp_connection *c) { + if(c && (c->state == ESTABLISHED || c->state == CLOSE_WAIT)) + return buffer_free(&c->rcvbuf); + else + return 0; +} + +void utcp_set_rcvbuf(struct utcp_connection *c, size_t size) { + if(!c) + return; + c->rcvbuf.maxsize = size; + if(c->rcvbuf.maxsize != size) + c->rcvbuf.maxsize = -1; +} + bool utcp_get_nodelay(struct utcp_connection *c) { return c ? c->nodelay : false; }