X-Git-Url: http://git.meshlink.io/?a=blobdiff_plain;f=utcp.c;h=f7c1859de2abe5ef05873bb1ef6565d449810e3d;hb=a6ea3a99259b6f74c3151d0692d2899b3657b17d;hp=3d0971a37ba359716533ddaeab9b03874473d510;hpb=372fb44a395d6ded38d6e52472a0f2f330c232c3;p=utcp diff --git a/utcp.c b/utcp.c index 3d0971a..f7c1859 100644 --- a/utcp.c +++ b/utcp.c @@ -61,8 +61,7 @@ static void timespec_sub(const struct timespec *a, const struct timespec *b, str } static int32_t timespec_diff_usec(const struct timespec *a, const struct timespec *b) { - int64_t diff = (a->tv_sec - b->tv_sec) * 1000000000 + a->tv_sec - b->tv_sec; - return diff / 1000; + return (a->tv_sec - b->tv_sec) * 1000000 + (a->tv_nsec - b->tv_nsec) / 1000; } static bool timespec_lt(const struct timespec *a, const struct timespec *b) { @@ -75,6 +74,7 @@ static bool timespec_lt(const struct timespec *a, const struct timespec *b) { static void timespec_clear(struct timespec *a) { a->tv_sec = 0; + a->tv_nsec = 0; } static bool timespec_isset(const struct timespec *a) { @@ -145,12 +145,13 @@ static void print_packet(struct utcp_connection *c, const char *dir, const void *p = 0; - debug(c, "%s: len %lu src %u dst %u seq %u ack %u wnd %u aux %x ctl %s%s%s%s data %s\n", + debug(c, "%s: len %lu src %u dst %u seq %u ack %u wnd %u aux %x ctl %s%s%s%s%s data %s\n", dir, (unsigned long)len, hdr.src, hdr.dst, hdr.seq, hdr.ack, hdr.wnd, hdr.aux, hdr.ctl & SYN ? "SYN" : "", hdr.ctl & RST ? "RST" : "", hdr.ctl & FIN ? "FIN" : "", hdr.ctl & ACK ? "ACK" : "", + hdr.ctl & MF ? "MF" : "", str ); } @@ -320,6 +321,44 @@ static ssize_t buffer_copy(struct buffer *buf, void *data, size_t offset, size_t return len; } +// Copy data from the buffer without removing it. +static ssize_t buffer_call(struct buffer *buf, utcp_recv_t cb, void *arg, size_t offset, size_t len) { + // Ensure we don't copy more than is actually stored in the buffer + if(offset >= buf->used) { + return 0; + } + + if(buf->used - offset < len) { + len = buf->used - offset; + } + + uint32_t realoffset = buf->offset + offset; + + if(buf->size - buf->offset < offset) { + // The offset wrapped + realoffset -= buf->size; + } + + if(buf->size - realoffset < len) { + // The data is wrapped + ssize_t rx1 = cb(arg, buf->data + realoffset, buf->size - realoffset); + + if(rx1 < buf->size - realoffset) { + return rx1; + } + + ssize_t rx2 = cb(arg, buf->data, len - (buf->size - realoffset)); + + if(rx2 < 0) { + return rx2; + } else { + return rx1 + rx2; + } + } else { + return cb(arg, buf->data + realoffset, len); + } +} + // Discard data from the buffer. static ssize_t buffer_discard(struct buffer *buf, size_t len) { if(buf->used < len) { @@ -330,12 +369,22 @@ static ssize_t buffer_discard(struct buffer *buf, size_t len) { buf->offset -= buf->size; } - buf->offset += len; + if(buf->used == len) { + buf->offset = 0; + } else { + buf->offset += len; + } + buf->used -= len; return len; } +static void buffer_clear(struct buffer *buf) { + buf->used = 0; + buf->offset = 0; +} + static bool buffer_set_size(struct buffer *buf, uint32_t minsize, uint32_t maxsize) { if(maxsize < minsize) { maxsize = minsize; @@ -352,7 +401,7 @@ static void buffer_exit(struct buffer *buf) { } static uint32_t buffer_free(const struct buffer *buf) { - return buf->maxsize - buf->used; + return buf->maxsize > buf->used ? buf->maxsize - buf->used : 0; } // Connections are stored in a sorted list. @@ -476,6 +525,9 @@ static struct utcp_connection *allocate_connection(struct utcp *utcp, uint16_t s c->snd.cwnd = (utcp->mss > 2190 ? 2 : utcp->mss > 1095 ? 3 : 4) * utcp->mss; c->snd.ssthresh = ~0; debug_cwnd(c); + c->srtt = 0; + c->rttvar = 0; + c->rto = START_RTO; c->utcp = utcp; // Add it to the sorted list of connections @@ -501,36 +553,34 @@ static void update_rtt(struct utcp_connection *c, uint32_t rtt) { return; } - struct utcp *utcp = c->utcp; - - if(!utcp->srtt) { - utcp->srtt = rtt; - utcp->rttvar = rtt / 2; + if(!c->srtt) { + c->srtt = rtt; + c->rttvar = rtt / 2; } else { - utcp->rttvar = (utcp->rttvar * 3 + absdiff(utcp->srtt, rtt)) / 4; - utcp->srtt = (utcp->srtt * 7 + rtt) / 8; + c->rttvar = (c->rttvar * 3 + absdiff(c->srtt, rtt)) / 4; + c->srtt = (c->srtt * 7 + rtt) / 8; } - utcp->rto = utcp->srtt + max(4 * utcp->rttvar, CLOCK_GRANULARITY); + c->rto = c->srtt + max(4 * c->rttvar, CLOCK_GRANULARITY); - if(utcp->rto > MAX_RTO) { - utcp->rto = MAX_RTO; + if(c->rto > MAX_RTO) { + c->rto = MAX_RTO; } - debug(c, "rtt %u srtt %u rttvar %u rto %u\n", rtt, utcp->srtt, utcp->rttvar, utcp->rto); + debug(c, "rtt %u srtt %u rttvar %u rto %u\n", rtt, c->srtt, c->rttvar, c->rto); } static void start_retransmit_timer(struct utcp_connection *c) { clock_gettime(UTCP_CLOCK, &c->rtrx_timeout); - uint32_t rto = c->utcp->rto; + uint32_t rto = c->rto; while(rto > USEC_PER_SEC) { c->rtrx_timeout.tv_sec++; rto -= USEC_PER_SEC; } - c->rtrx_timeout.tv_nsec += c->utcp->rto * 1000; + c->rtrx_timeout.tv_nsec += rto * 1000; if(c->rtrx_timeout.tv_nsec >= NSEC_PER_SEC) { c->rtrx_timeout.tv_nsec -= NSEC_PER_SEC; @@ -606,7 +656,7 @@ void utcp_accept(struct utcp_connection *c, utcp_recv_t recv, void *priv) { static void ack(struct utcp_connection *c, bool sendatleastone) { int32_t left = seqdiff(c->snd.last, c->snd.nxt); - int32_t cwndleft = min(c->snd.cwnd, c->snd.wnd) - seqdiff(c->snd.nxt, c->snd.una); + int32_t cwndleft = is_reliable(c) ? min(c->snd.cwnd, c->snd.wnd) - seqdiff(c->snd.nxt, c->snd.una) : MAX_UNRELIABLE_SIZE; assert(left >= 0); @@ -634,7 +684,7 @@ static void ack(struct utcp_connection *c, bool sendatleastone) { pkt->hdr.src = c->src; pkt->hdr.dst = c->dst; pkt->hdr.ack = c->rcv.nxt; - pkt->hdr.wnd = c->rcvbuf.maxsize; + pkt->hdr.wnd = is_reliable(c) ? c->rcvbuf.maxsize : 0; pkt->hdr.ctl = ACK; pkt->hdr.aux = 0; @@ -647,6 +697,14 @@ static void ack(struct utcp_connection *c, bool sendatleastone) { c->snd.nxt += seglen; left -= seglen; + if(!is_reliable(c)) { + if(left) { + pkt->hdr.ctl |= MF; + } else { + pkt->hdr.ctl &= ~MF; + } + } + if(seglen && fin_wanted(c, c->snd.nxt)) { seglen--; pkt->hdr.ctl |= FIN; @@ -661,6 +719,10 @@ static void ack(struct utcp_connection *c, bool sendatleastone) { print_packet(c, "send", pkt, sizeof(pkt->hdr) + seglen); c->utcp->send(c->utcp, pkt, sizeof(pkt->hdr) + seglen); + + if(left && !is_reliable(c)) { + pkt->hdr.wnd += seglen; + } } while(left); } @@ -721,8 +783,13 @@ ssize_t utcp_send(struct utcp_connection *c, const void *data, size_t len) { // Add data to send buffer. - if(is_reliable(c) || (c->state != SYN_SENT && c->state != SYN_RECEIVED)) { + if(is_reliable(c)) { len = buffer_put(&c->sndbuf, data, len); + } else if(c->state != SYN_SENT && c->state != SYN_RECEIVED) { + if(len > MAX_UNRELIABLE_SIZE || buffer_put(&c->sndbuf, data, len) != (ssize_t)len) { + errno = EMSGSIZE; + return -1; + } } else { return 0; } @@ -749,7 +816,6 @@ ssize_t utcp_send(struct utcp_connection *c, const void *data, size_t len) { if(!is_reliable(c)) { c->snd.una = c->snd.nxt = c->snd.last; buffer_discard(&c->sndbuf, c->sndbuf.used); - c->do_poll = true; } if(is_reliable(c) && !timespec_isset(&c->rtrx_timeout)) { @@ -908,10 +974,10 @@ static void retransmit(struct utcp_connection *c) { } start_retransmit_timer(c); - utcp->rto *= 2; + c->rto *= 2; - if(utcp->rto > MAX_RTO) { - utcp->rto = MAX_RTO; + if(c->rto > MAX_RTO) { + c->rto = MAX_RTO; } c->rtt_start.tv_sec = 0; // invalidate RTT timer @@ -979,8 +1045,14 @@ static void handle_out_of_order(struct utcp_connection *c, uint32_t offset, cons // Packet loss or reordering occured. Store the data in the buffer. ssize_t rxd = buffer_put_at(&c->rcvbuf, offset, data, len); - if(rxd < 0 || (size_t)rxd < len) { - abort(); + if(rxd <= 0) { + debug(c, "packet outside receive buffer, dropping\n"); + return; + } + + if((size_t)rxd < len) { + debug(c, "packet partially outside receive buffer\n"); + len = rxd; } // Make note of where we put it. @@ -1024,23 +1096,35 @@ static void handle_out_of_order(struct utcp_connection *c, uint32_t offset, cons } static void handle_in_order(struct utcp_connection *c, const void *data, size_t len) { - // Check if we can process out-of-order data now. - if(c->sacks[0].len && len >= c->sacks[0].offset) { // TODO: handle overlap with second SACK - debug(c, "incoming packet len %lu connected with SACK at %u\n", (unsigned long)len, c->sacks[0].offset); - buffer_put_at(&c->rcvbuf, 0, data, len); // TODO: handle return value - len = max(len, c->sacks[0].offset + c->sacks[0].len); - data = c->rcvbuf.data; - } - if(c->recv) { ssize_t rxd = c->recv(c, data, len); - if(rxd < 0 || (size_t)rxd != len) { + if(rxd != (ssize_t)len) { // TODO: handle the application not accepting all data. abort(); } } + // Check if we can process out-of-order data now. + if(c->sacks[0].len && len >= c->sacks[0].offset) { + debug(c, "incoming packet len %lu connected with SACK at %u\n", (unsigned long)len, c->sacks[0].offset); + + if(len < c->sacks[0].offset + c->sacks[0].len) { + size_t offset = len; + len = c->sacks[0].offset + c->sacks[0].len; + size_t remainder = len - offset; + + if(c->recv) { + ssize_t rxd = buffer_call(&c->rcvbuf, c->recv, c, offset, remainder); + + if(rxd != (ssize_t)remainder) { + // TODO: handle the application not accepting all data. + abort(); + } + } + } + } + if(c->rcvbuf.used) { sack_consume(c, len); } @@ -1048,20 +1132,54 @@ static void handle_in_order(struct utcp_connection *c, const void *data, size_t c->rcv.nxt += len; } +static void handle_unreliable(struct utcp_connection *c, const struct hdr *hdr, const void *data, size_t len) { + // Fast path for unfragmented packets + if(!hdr->wnd && !(hdr->ctl & MF)) { + if(c->recv) { + c->recv(c, data, len); + } -static void handle_incoming_data(struct utcp_connection *c, uint32_t seq, const void *data, size_t len) { - if(!is_reliable(c)) { - c->recv(c, data, len); - c->rcv.nxt = seq + len; + c->rcv.nxt = hdr->seq + len; return; } - uint32_t offset = seqdiff(seq, c->rcv.nxt); + // Ensure reassembled packet are not larger than 64 kiB + if(hdr->wnd >= MAX_UNRELIABLE_SIZE || hdr->wnd + len > MAX_UNRELIABLE_SIZE) { + return; + } - if(offset + len > c->rcvbuf.maxsize) { - abort(); + // Don't accept out of order fragments + if(hdr->wnd && hdr->seq != c->rcv.nxt) { + return; + } + + // Reset the receive buffer for the first fragment + if(!hdr->wnd) { + buffer_clear(&c->rcvbuf); + } + + ssize_t rxd = buffer_put_at(&c->rcvbuf, hdr->wnd, data, len); + + if(rxd != (ssize_t)len) { + return; + } + + // Send the packet if it's the final fragment + if(!(hdr->ctl & MF) && c->recv) { + buffer_call(&c->rcvbuf, c->recv, c, 0, hdr->wnd + len); + } + + c->rcv.nxt = hdr->seq + len; +} + +static void handle_incoming_data(struct utcp_connection *c, const struct hdr *hdr, const void *data, size_t len) { + if(!is_reliable(c)) { + handle_unreliable(c, hdr, data, len); + return; } + uint32_t offset = seqdiff(hdr->seq, c->rcv.nxt); + if(offset) { handle_out_of_order(c, offset, data, len); } else { @@ -1113,7 +1231,7 @@ ssize_t utcp_recv(struct utcp *utcp, const void *data, size_t len) { // Drop packets with an unknown CTL flag - if(hdr.ctl & ~(SYN | ACK | RST | FIN)) { + if(hdr.ctl & ~(SYN | ACK | RST | FIN | MF)) { print_packet(NULL, "recv", data, len); errno = EBADMSG; return -1; @@ -1327,10 +1445,6 @@ synack: debug(c, "packet out of order, offset %u bytes", rcv_offset); } - if(rcv_offset >= 0) { - c->rcv.nxt = hdr.seq + len; - } - #endif } @@ -1485,7 +1599,10 @@ synack: if(data_acked) { buffer_discard(&c->sndbuf, data_acked); - c->do_poll = true; + + if(is_reliable(c)) { + c->do_poll = true; + } } // Also advance snd.nxt if possible @@ -1686,7 +1803,7 @@ skip_ack: return 0; } - handle_incoming_data(c, hdr.seq, ptr, len); + handle_incoming_data(c, &hdr, ptr, len); } // 7. Process FIN stuff @@ -2068,7 +2185,6 @@ struct utcp *utcp_init(utcp_accept_t accept, utcp_pre_accept_t pre_accept, utcp_ utcp->priv = priv; utcp_set_mtu(utcp, DEFAULT_MTU); utcp->timeout = DEFAULT_USER_TIMEOUT; // sec - utcp->rto = START_RTO; // usec return utcp; } @@ -2097,6 +2213,7 @@ void utcp_exit(struct utcp *utcp) { } free(utcp->connections); + free(utcp->pkt); free(utcp); } @@ -2160,10 +2277,10 @@ void utcp_reset_timers(struct utcp *utcp) { } c->rtt_start.tv_sec = 0; - } - if(utcp->rto > START_RTO) { - utcp->rto = START_RTO; + if(c->rto > START_RTO) { + c->rto = START_RTO; + } } } @@ -2209,7 +2326,7 @@ void utcp_set_sndbuf(struct utcp_connection *c, size_t size) { c->sndbuf.maxsize = -1; } - c->do_poll = buffer_free(&c->sndbuf); + c->do_poll = is_reliable(c) && buffer_free(&c->sndbuf); } size_t utcp_get_rcvbuf(struct utcp_connection *c) { @@ -2277,7 +2394,7 @@ void utcp_set_recv_cb(struct utcp_connection *c, utcp_recv_t recv) { void utcp_set_poll_cb(struct utcp_connection *c, utcp_poll_t poll) { if(c) { c->poll = poll; - c->do_poll = buffer_free(&c->sndbuf); + c->do_poll = is_reliable(c) && buffer_free(&c->sndbuf); } } @@ -2330,11 +2447,11 @@ void utcp_offline(struct utcp *utcp, bool offline) { } utcp->connections[i]->rtt_start.tv_sec = 0; - } - } - if(!offline && utcp->rto > START_RTO) { - utcp->rto = START_RTO; + if(c->rto > START_RTO) { + c->rto = START_RTO; + } + } } }