X-Git-Url: http://git.meshlink.io/?p=utcp;a=blobdiff_plain;f=utcp.c;h=04d2b194c20f193aa44f2d4006b9afc1bd356b29;hp=19f6cfaf098377bc748de022bdfe506912796482;hb=HEAD;hpb=29d79607fc285618f8441c8810632aa79ceade15 diff --git a/utcp.c b/utcp.c index 19f6cfa..04d2b19 100644 --- a/utcp.c +++ b/utcp.c @@ -268,7 +268,7 @@ static ssize_t buffer_put_at(struct buffer *buf, size_t offset, const void *data uint32_t realoffset = buf->offset + offset; - if(buf->size - buf->offset < offset) { + if(buf->size - buf->offset <= offset) { // The offset wrapped realoffset -= buf->size; } @@ -305,7 +305,7 @@ static ssize_t buffer_copy(struct buffer *buf, void *data, size_t offset, size_t uint32_t realoffset = buf->offset + offset; - if(buf->size - buf->offset < offset) { + if(buf->size - buf->offset <= offset) { // The offset wrapped realoffset -= buf->size; } @@ -322,7 +322,11 @@ static ssize_t buffer_copy(struct buffer *buf, void *data, size_t offset, size_t } // Copy data from the buffer without removing it. -static ssize_t buffer_call(struct buffer *buf, utcp_recv_t cb, void *arg, size_t offset, size_t len) { +static ssize_t buffer_call(struct utcp_connection *c, struct buffer *buf, size_t offset, size_t len) { + if(!c->recv) { + return len; + } + // Ensure we don't copy more than is actually stored in the buffer if(offset >= buf->used) { return 0; @@ -334,20 +338,25 @@ static ssize_t buffer_call(struct buffer *buf, utcp_recv_t cb, void *arg, size_t uint32_t realoffset = buf->offset + offset; - if(buf->size - buf->offset < offset) { + if(buf->size - buf->offset <= offset) { // The offset wrapped realoffset -= buf->size; } if(buf->size - realoffset < len) { // The data is wrapped - ssize_t rx1 = cb(arg, buf->data + realoffset, buf->size - realoffset); + ssize_t rx1 = c->recv(c, buf->data + realoffset, buf->size - realoffset); if(rx1 < buf->size - realoffset) { return rx1; } - ssize_t rx2 = cb(arg, buf->data, len - (buf->size - realoffset)); + // The channel might have been closed by the previous callback + if(!c->recv) { + return len; + } + + ssize_t rx2 = c->recv(c, buf->data, len - (buf->size - realoffset)); if(rx2 < 0) { return rx2; @@ -355,7 +364,7 @@ static ssize_t buffer_call(struct buffer *buf, utcp_recv_t cb, void *arg, size_t return rx1 + rx2; } } else { - return cb(arg, buf->data + realoffset, len); + return c->recv(c, buf->data + realoffset, len); } } @@ -365,7 +374,7 @@ static ssize_t buffer_discard(struct buffer *buf, size_t len) { len = buf->used; } - if(buf->size - buf->offset < len) { + if(buf->size - buf->offset <= len) { buf->offset -= buf->size; } @@ -890,6 +899,10 @@ static void retransmit(struct utcp_connection *c) { struct utcp *utcp = c->utcp; + if(utcp->retransmit) { + utcp->retransmit(c); + } + struct { struct hdr hdr; uint8_t data[]; @@ -1106,13 +1119,11 @@ static void handle_in_order(struct utcp_connection *c, const void *data, size_t len = c->sacks[0].offset + c->sacks[0].len; size_t remainder = len - offset; - if(c->recv) { - ssize_t rxd = buffer_call(&c->rcvbuf, c->recv, c, offset, remainder); + ssize_t rxd = buffer_call(c, &c->rcvbuf, offset, remainder); - if(rxd != (ssize_t)remainder) { - // TODO: handle the application not accepting all data. - abort(); - } + if(rxd != (ssize_t)remainder) { + // TODO: handle the application not accepting all data. + abort(); } } } @@ -1157,8 +1168,8 @@ static void handle_unreliable(struct utcp_connection *c, const struct hdr *hdr, } // Send the packet if it's the final fragment - if(!(hdr->ctl & MF) && c->recv) { - buffer_call(&c->rcvbuf, c->recv, c, 0, hdr->wnd + len); + if(!(hdr->ctl & MF)) { + buffer_call(c, &c->rcvbuf, 0, hdr->wnd + len); } c->rcv.nxt = hdr->seq + len; @@ -1348,6 +1359,8 @@ synack: print_packet(c, "send", &pkt, sizeof(hdr)); utcp->send(utcp, &pkt, sizeof(hdr)); } + + start_retransmit_timer(c); } else { // No, we don't want your packets, send a RST back len = 1; @@ -1709,7 +1722,7 @@ skip_ack: } c->rcv.irs = hdr.seq; - c->rcv.nxt = hdr.seq; + c->rcv.nxt = hdr.seq + 1; if(c->shut_wr) { c->snd.last++; @@ -1718,7 +1731,6 @@ skip_ack: set_state(c, ESTABLISHED); } - // TODO: notify application of this somehow. break; case SYN_RECEIVED: @@ -1732,8 +1744,8 @@ skip_ack: case CLOSING: case LAST_ACK: case TIME_WAIT: - // Ehm, no. We should never receive a second SYN. - return 0; + // This could be a retransmission. Ignore the SYN flag, but send an ACK back. + break; default: #ifdef UTCP_DEBUG @@ -1741,9 +1753,6 @@ skip_ack: #endif return 0; } - - // SYN counts as one sequence number - c->rcv.nxt++; } // 6. Process new data @@ -2165,6 +2174,13 @@ struct utcp *utcp_init(utcp_accept_t accept, utcp_pre_accept_t pre_accept, utcp_ return NULL; } + utcp_set_mtu(utcp, DEFAULT_MTU); + + if(!utcp->pkt) { + free(utcp); + return NULL; + } + if(!CLOCK_GRANULARITY) { struct timespec res; clock_getres(UTCP_CLOCK, &res); @@ -2175,7 +2191,6 @@ struct utcp *utcp_init(utcp_accept_t accept, utcp_pre_accept_t pre_accept, utcp_ utcp->pre_accept = pre_accept; utcp->send = send; utcp->priv = priv; - utcp_set_mtu(utcp, DEFAULT_MTU); utcp->timeout = DEFAULT_USER_TIMEOUT; // sec return utcp; @@ -2447,6 +2462,10 @@ void utcp_offline(struct utcp *utcp, bool offline) { } } +void utcp_set_retransmit_cb(struct utcp *utcp, utcp_retransmit_t retransmit) { + utcp->retransmit = retransmit; +} + void utcp_set_clock_granularity(long granularity) { CLOCK_GRANULARITY = granularity; }