X-Git-Url: http://git.meshlink.io/?p=utcp;a=blobdiff_plain;f=utcp.c;h=04d2b194c20f193aa44f2d4006b9afc1bd356b29;hp=10d026efabf2679db6ca419c71131e5fecd7e582;hb=HEAD;hpb=9470c449236afef22210803b359fe44d74e32636 diff --git a/utcp.c b/utcp.c index 10d026e..04d2b19 100644 --- a/utcp.c +++ b/utcp.c @@ -74,6 +74,7 @@ static bool timespec_lt(const struct timespec *a, const struct timespec *b) { static void timespec_clear(struct timespec *a) { a->tv_sec = 0; + a->tv_nsec = 0; } static bool timespec_isset(const struct timespec *a) { @@ -267,7 +268,7 @@ static ssize_t buffer_put_at(struct buffer *buf, size_t offset, const void *data uint32_t realoffset = buf->offset + offset; - if(buf->size - buf->offset < offset) { + if(buf->size - buf->offset <= offset) { // The offset wrapped realoffset -= buf->size; } @@ -304,7 +305,7 @@ static ssize_t buffer_copy(struct buffer *buf, void *data, size_t offset, size_t uint32_t realoffset = buf->offset + offset; - if(buf->size - buf->offset < offset) { + if(buf->size - buf->offset <= offset) { // The offset wrapped realoffset -= buf->size; } @@ -321,7 +322,11 @@ static ssize_t buffer_copy(struct buffer *buf, void *data, size_t offset, size_t } // Copy data from the buffer without removing it. -static ssize_t buffer_call(struct buffer *buf, utcp_recv_t cb, void *arg, size_t offset, size_t len) { +static ssize_t buffer_call(struct utcp_connection *c, struct buffer *buf, size_t offset, size_t len) { + if(!c->recv) { + return len; + } + // Ensure we don't copy more than is actually stored in the buffer if(offset >= buf->used) { return 0; @@ -333,20 +338,25 @@ static ssize_t buffer_call(struct buffer *buf, utcp_recv_t cb, void *arg, size_t uint32_t realoffset = buf->offset + offset; - if(buf->size - buf->offset < offset) { + if(buf->size - buf->offset <= offset) { // The offset wrapped realoffset -= buf->size; } if(buf->size - realoffset < len) { // The data is wrapped - ssize_t rx1 = cb(arg, buf->data + realoffset, buf->size - realoffset); + ssize_t rx1 = c->recv(c, buf->data + realoffset, buf->size - realoffset); if(rx1 < buf->size - realoffset) { return rx1; } - ssize_t rx2 = cb(arg, buf->data, len - (buf->size - realoffset)); + // The channel might have been closed by the previous callback + if(!c->recv) { + return len; + } + + ssize_t rx2 = c->recv(c, buf->data, len - (buf->size - realoffset)); if(rx2 < 0) { return rx2; @@ -354,7 +364,7 @@ static ssize_t buffer_call(struct buffer *buf, utcp_recv_t cb, void *arg, size_t return rx1 + rx2; } } else { - return cb(arg, buf->data + realoffset, len); + return c->recv(c, buf->data + realoffset, len); } } @@ -364,7 +374,7 @@ static ssize_t buffer_discard(struct buffer *buf, size_t len) { len = buf->used; } - if(buf->size - buf->offset < len) { + if(buf->size - buf->offset <= len) { buf->offset -= buf->size; } @@ -400,7 +410,7 @@ static void buffer_exit(struct buffer *buf) { } static uint32_t buffer_free(const struct buffer *buf) { - return buf->maxsize - buf->used; + return buf->maxsize > buf->used ? buf->maxsize - buf->used : 0; } // Connections are stored in a sorted list. @@ -524,6 +534,9 @@ static struct utcp_connection *allocate_connection(struct utcp *utcp, uint16_t s c->snd.cwnd = (utcp->mss > 2190 ? 2 : utcp->mss > 1095 ? 3 : 4) * utcp->mss; c->snd.ssthresh = ~0; debug_cwnd(c); + c->srtt = 0; + c->rttvar = 0; + c->rto = START_RTO; c->utcp = utcp; // Add it to the sorted list of connections @@ -549,36 +562,34 @@ static void update_rtt(struct utcp_connection *c, uint32_t rtt) { return; } - struct utcp *utcp = c->utcp; - - if(!utcp->srtt) { - utcp->srtt = rtt; - utcp->rttvar = rtt / 2; + if(!c->srtt) { + c->srtt = rtt; + c->rttvar = rtt / 2; } else { - utcp->rttvar = (utcp->rttvar * 3 + absdiff(utcp->srtt, rtt)) / 4; - utcp->srtt = (utcp->srtt * 7 + rtt) / 8; + c->rttvar = (c->rttvar * 3 + absdiff(c->srtt, rtt)) / 4; + c->srtt = (c->srtt * 7 + rtt) / 8; } - utcp->rto = utcp->srtt + max(4 * utcp->rttvar, CLOCK_GRANULARITY); + c->rto = c->srtt + max(4 * c->rttvar, CLOCK_GRANULARITY); - if(utcp->rto > MAX_RTO) { - utcp->rto = MAX_RTO; + if(c->rto > MAX_RTO) { + c->rto = MAX_RTO; } - debug(c, "rtt %u srtt %u rttvar %u rto %u\n", rtt, utcp->srtt, utcp->rttvar, utcp->rto); + debug(c, "rtt %u srtt %u rttvar %u rto %u\n", rtt, c->srtt, c->rttvar, c->rto); } static void start_retransmit_timer(struct utcp_connection *c) { clock_gettime(UTCP_CLOCK, &c->rtrx_timeout); - uint32_t rto = c->utcp->rto; + uint32_t rto = c->rto; while(rto > USEC_PER_SEC) { c->rtrx_timeout.tv_sec++; rto -= USEC_PER_SEC; } - c->rtrx_timeout.tv_nsec += c->utcp->rto * 1000; + c->rtrx_timeout.tv_nsec += rto * 1000; if(c->rtrx_timeout.tv_nsec >= NSEC_PER_SEC) { c->rtrx_timeout.tv_nsec -= NSEC_PER_SEC; @@ -814,7 +825,6 @@ ssize_t utcp_send(struct utcp_connection *c, const void *data, size_t len) { if(!is_reliable(c)) { c->snd.una = c->snd.nxt = c->snd.last; buffer_discard(&c->sndbuf, c->sndbuf.used); - c->do_poll = true; } if(is_reliable(c) && !timespec_isset(&c->rtrx_timeout)) { @@ -846,13 +856,7 @@ static void fast_retransmit(struct utcp_connection *c) { struct { struct hdr hdr; uint8_t data[]; - } *pkt; - - pkt = malloc(c->utcp->mtu); - - if(!pkt) { - return; - } + } *pkt = c->utcp->pkt; pkt->hdr.src = c->src; pkt->hdr.dst = c->dst; @@ -884,8 +888,6 @@ static void fast_retransmit(struct utcp_connection *c) { default: break; } - - free(pkt); } static void retransmit(struct utcp_connection *c) { @@ -897,6 +899,10 @@ static void retransmit(struct utcp_connection *c) { struct utcp *utcp = c->utcp; + if(utcp->retransmit) { + utcp->retransmit(c); + } + struct { struct hdr hdr; uint8_t data[]; @@ -973,10 +979,10 @@ static void retransmit(struct utcp_connection *c) { } start_retransmit_timer(c); - utcp->rto *= 2; + c->rto *= 2; - if(utcp->rto > MAX_RTO) { - utcp->rto = MAX_RTO; + if(c->rto > MAX_RTO) { + c->rto = MAX_RTO; } c->rtt_start.tv_sec = 0; // invalidate RTT timer @@ -1044,8 +1050,14 @@ static void handle_out_of_order(struct utcp_connection *c, uint32_t offset, cons // Packet loss or reordering occured. Store the data in the buffer. ssize_t rxd = buffer_put_at(&c->rcvbuf, offset, data, len); - if(rxd < 0 || (size_t)rxd < len) { - abort(); + if(rxd <= 0) { + debug(c, "packet outside receive buffer, dropping\n"); + return; + } + + if((size_t)rxd < len) { + debug(c, "packet partially outside receive buffer\n"); + len = rxd; } // Make note of where we put it. @@ -1106,7 +1118,8 @@ static void handle_in_order(struct utcp_connection *c, const void *data, size_t size_t offset = len; len = c->sacks[0].offset + c->sacks[0].len; size_t remainder = len - offset; - ssize_t rxd = buffer_call(&c->rcvbuf, c->recv, c, offset, remainder); + + ssize_t rxd = buffer_call(c, &c->rcvbuf, offset, remainder); if(rxd != (ssize_t)remainder) { // TODO: handle the application not accepting all data. @@ -1125,7 +1138,10 @@ static void handle_in_order(struct utcp_connection *c, const void *data, size_t static void handle_unreliable(struct utcp_connection *c, const struct hdr *hdr, const void *data, size_t len) { // Fast path for unfragmented packets if(!hdr->wnd && !(hdr->ctl & MF)) { - c->recv(c, data, len); + if(c->recv) { + c->recv(c, data, len); + } + c->rcv.nxt = hdr->seq + len; return; } @@ -1153,7 +1169,7 @@ static void handle_unreliable(struct utcp_connection *c, const struct hdr *hdr, // Send the packet if it's the final fragment if(!(hdr->ctl & MF)) { - buffer_call(&c->rcvbuf, c->recv, c, 0, hdr->wnd + len); + buffer_call(c, &c->rcvbuf, 0, hdr->wnd + len); } c->rcv.nxt = hdr->seq + len; @@ -1167,10 +1183,6 @@ static void handle_incoming_data(struct utcp_connection *c, const struct hdr *hd uint32_t offset = seqdiff(hdr->seq, c->rcv.nxt); - if(offset + len > c->rcvbuf.maxsize) { - abort(); - } - if(offset) { handle_out_of_order(c, offset, data, len); } else { @@ -1347,6 +1359,8 @@ synack: print_packet(c, "send", &pkt, sizeof(hdr)); utcp->send(utcp, &pkt, sizeof(hdr)); } + + start_retransmit_timer(c); } else { // No, we don't want your packets, send a RST back len = 1; @@ -1590,7 +1604,10 @@ synack: if(data_acked) { buffer_discard(&c->sndbuf, data_acked); - c->do_poll = true; + + if(is_reliable(c)) { + c->do_poll = true; + } } // Also advance snd.nxt if possible @@ -1705,7 +1722,7 @@ skip_ack: } c->rcv.irs = hdr.seq; - c->rcv.nxt = hdr.seq; + c->rcv.nxt = hdr.seq + 1; if(c->shut_wr) { c->snd.last++; @@ -1714,7 +1731,6 @@ skip_ack: set_state(c, ESTABLISHED); } - // TODO: notify application of this somehow. break; case SYN_RECEIVED: @@ -1728,8 +1744,8 @@ skip_ack: case CLOSING: case LAST_ACK: case TIME_WAIT: - // Ehm, no. We should never receive a second SYN. - return 0; + // This could be a retransmission. Ignore the SYN flag, but send an ACK back. + break; default: #ifdef UTCP_DEBUG @@ -1737,9 +1753,6 @@ skip_ack: #endif return 0; } - - // SYN counts as one sequence number - c->rcv.nxt++; } // 6. Process new data @@ -2161,6 +2174,13 @@ struct utcp *utcp_init(utcp_accept_t accept, utcp_pre_accept_t pre_accept, utcp_ return NULL; } + utcp_set_mtu(utcp, DEFAULT_MTU); + + if(!utcp->pkt) { + free(utcp); + return NULL; + } + if(!CLOCK_GRANULARITY) { struct timespec res; clock_getres(UTCP_CLOCK, &res); @@ -2171,9 +2191,7 @@ struct utcp *utcp_init(utcp_accept_t accept, utcp_pre_accept_t pre_accept, utcp_ utcp->pre_accept = pre_accept; utcp->send = send; utcp->priv = priv; - utcp_set_mtu(utcp, DEFAULT_MTU); utcp->timeout = DEFAULT_USER_TIMEOUT; // sec - utcp->rto = START_RTO; // usec return utcp; } @@ -2266,10 +2284,10 @@ void utcp_reset_timers(struct utcp *utcp) { } c->rtt_start.tv_sec = 0; - } - if(utcp->rto > START_RTO) { - utcp->rto = START_RTO; + if(c->rto > START_RTO) { + c->rto = START_RTO; + } } } @@ -2315,7 +2333,7 @@ void utcp_set_sndbuf(struct utcp_connection *c, size_t size) { c->sndbuf.maxsize = -1; } - c->do_poll = buffer_free(&c->sndbuf); + c->do_poll = is_reliable(c) && buffer_free(&c->sndbuf); } size_t utcp_get_rcvbuf(struct utcp_connection *c) { @@ -2383,7 +2401,7 @@ void utcp_set_recv_cb(struct utcp_connection *c, utcp_recv_t recv) { void utcp_set_poll_cb(struct utcp_connection *c, utcp_poll_t poll) { if(c) { c->poll = poll; - c->do_poll = buffer_free(&c->sndbuf); + c->do_poll = is_reliable(c) && buffer_free(&c->sndbuf); } } @@ -2436,12 +2454,16 @@ void utcp_offline(struct utcp *utcp, bool offline) { } utcp->connections[i]->rtt_start.tv_sec = 0; + + if(c->rto > START_RTO) { + c->rto = START_RTO; + } } } +} - if(!offline && utcp->rto > START_RTO) { - utcp->rto = START_RTO; - } +void utcp_set_retransmit_cb(struct utcp *utcp, utcp_retransmit_t retransmit) { + utcp->retransmit = retransmit; } void utcp_set_clock_granularity(long granularity) {