X-Git-Url: http://git.meshlink.io/?a=blobdiff_plain;f=src%2Futcp.c;fp=src%2Futcp.c;h=2a1ac5b86f370ec319b833445a24c918f3bc4402;hb=d90d88adfbfc340e8eb473e0265f2741bf3f40a4;hp=c4305cd4de4978d265155f2dcb537cd746e39251;hpb=36a7403d3ab239a03c4085fa46f0df2127394a95;p=meshlink diff --git a/src/utcp.c b/src/utcp.c index c4305cd4..2a1ac5b8 100644 --- a/src/utcp.c +++ b/src/utcp.c @@ -195,6 +195,10 @@ static bool is_reliable(struct utcp_connection *c) { return c->flags & UTCP_RELIABLE; } +static bool is_framed(struct utcp_connection *c) { + return c->flags & UTCP_FRAMED; +} + static int32_t seqdiff(uint32_t a, uint32_t b) { return a - b; } @@ -599,6 +603,19 @@ static void start_retransmit_timer(struct utcp_connection *c) { debug(c, "rtrx_timeout %ld.%06lu\n", c->rtrx_timeout.tv_sec, c->rtrx_timeout.tv_nsec); } +static void start_flush_timer(struct utcp_connection *c) { + clock_gettime(UTCP_CLOCK, &c->rtrx_timeout); + + c->rtrx_timeout.tv_nsec += c->utcp->flush_timeout * 1000000; + + if(c->rtrx_timeout.tv_nsec >= NSEC_PER_SEC) { + c->rtrx_timeout.tv_nsec -= NSEC_PER_SEC; + c->rtrx_timeout.tv_sec++; + } + + debug(c, "rtrx_timeout %ld.%06lu (flush)\n", c->rtrx_timeout.tv_sec, c->rtrx_timeout.tv_nsec); +} + static void stop_retransmit_timer(struct utcp_connection *c) { timespec_clear(&c->rtrx_timeout); debug(c, "rtrx_timeout cleared\n"); @@ -735,108 +752,263 @@ static void ack(struct utcp_connection *c, bool sendatleastone) { } while(left); } -ssize_t utcp_send(struct utcp_connection *c, const void *data, size_t len) { - if(c->reapable) { - debug(c, "send() called on closed connection\n"); - errno = EBADF; - return -1; +static ssize_t utcp_send_reliable(struct utcp_connection *c, const void *data, size_t len) { + size_t rlen = len + (is_framed(c) ? 2 : 0); + + if(!rlen) { + return 0; } - switch(c->state) { - case CLOSED: - case LISTEN: - debug(c, "send() called on unconnected connection\n"); - errno = ENOTCONN; - return -1; + // Check if we need to be able to buffer all data - case SYN_SENT: - case SYN_RECEIVED: - case ESTABLISHED: - case CLOSE_WAIT: - break; + if(c->flags & (UTCP_NO_PARTIAL | UTCP_FRAMED)) { + if(rlen > c->sndbuf.maxsize) { + errno = EMSGSIZE; + return -1; + } - case FIN_WAIT_1: - case FIN_WAIT_2: - case CLOSING: - case LAST_ACK: - case TIME_WAIT: - debug(c, "send() called on closed connection\n"); - errno = EPIPE; - return -1; + if((c->flags & UTCP_FRAMED) && len > MAX_UNRELIABLE_SIZE) { + errno = EMSGSIZE; + return -1; + } + + if(rlen > buffer_free(&c->sndbuf)) { + errno = EWOULDBLOCK; + return 0; + } } - // Exit early if we have nothing to send. + // Add data to the send buffer. - if(!len) { - return 0; + if(is_framed(c)) { + uint16_t len16 = len; + buffer_put(&c->sndbuf, &len16, sizeof(len16)); + assert(buffer_put(&c->sndbuf, data, len) == (ssize_t)len); + } else { + len = buffer_put(&c->sndbuf, data, len); + + if(len <= 0) { + errno = EWOULDBLOCK; + return 0; + } } - if(!data) { - errno = EFAULT; - return -1; + c->snd.last += rlen; + + // Don't send anything yet if the connection has not fully established yet + + if(c->state == SYN_SENT || c->state == SYN_RECEIVED) { + return len; } - // Check if we need to be able to buffer all data + ack(c, false); - if(c->flags & UTCP_NO_PARTIAL) { - if(len > buffer_free(&c->sndbuf)) { - if(len > c->sndbuf.maxsize) { - errno = EMSGSIZE; - return -1; - } else { - errno = EWOULDBLOCK; - return 0; - } - } + if(!timespec_isset(&c->rtrx_timeout)) { + start_retransmit_timer(c); } - // Add data to send buffer. + if(!timespec_isset(&c->conn_timeout)) { + clock_gettime(UTCP_CLOCK, &c->conn_timeout); + c->conn_timeout.tv_sec += c->utcp->timeout; + } - if(is_reliable(c)) { - len = buffer_put(&c->sndbuf, data, len); - } else if(c->state != SYN_SENT && c->state != SYN_RECEIVED) { - if(len > MAX_UNRELIABLE_SIZE || buffer_put(&c->sndbuf, data, len) != (ssize_t)len) { - errno = EMSGSIZE; - return -1; + return len; +} + + +/* In the send buffer we can have multiple frames, each prefixed with their + length. However, the first frame might already have been partially sent. The + variable c->frame_offset tracks how much of a partial frame is left at the + start. If it is 0, it means there is no partial frame, and the first two + bytes in the send buffer are the length of the first frame. + + After sending an MSS sized packet, we need to calculate the new frame_offset + value, since it is likely that the next packet will also have a partial frame + at the start. We do this by scanning the previously sent packet for frame + headers, to find out how many bytes of the last frame are left to send. +*/ +static void ack_unreliable_framed(struct utcp_connection *c) { + int32_t left = seqdiff(c->snd.last, c->snd.nxt); + assert(left > 0); + + struct { + struct hdr hdr; + uint8_t data[]; + } *pkt = c->utcp->pkt; + + pkt->hdr.src = c->src; + pkt->hdr.dst = c->dst; + pkt->hdr.ack = c->rcv.nxt; + pkt->hdr.ctl = ACK | MF; + pkt->hdr.aux = 0; + + bool sent_packet = false; + + while(left >= c->utcp->mss) { + pkt->hdr.wnd = c->frame_offset; + uint32_t seglen = c->utcp->mss; + + pkt->hdr.seq = c->snd.nxt; + + buffer_copy(&c->sndbuf, pkt->data, seqdiff(c->snd.nxt, c->snd.una), seglen); + + c->snd.nxt += seglen; + c->snd.una = c->snd.nxt; + left -= seglen; + + print_packet(c, "send", pkt, sizeof(pkt->hdr) + seglen); + c->utcp->send(c->utcp, pkt, sizeof(pkt->hdr) + seglen); + sent_packet = true; + + // Calculate the new frame offset + while(c->frame_offset < seglen) { + uint16_t framelen; + buffer_copy(&c->sndbuf, &framelen, c->frame_offset, sizeof(framelen)); + c->frame_offset += framelen + 2; } - } else { - return 0; + + buffer_discard(&c->sndbuf, seglen); + c->frame_offset -= seglen; + }; + + if(sent_packet) { + if(left) { + // We sent one packet but we have partial data left, (re)start the flush timer + start_flush_timer(c); + } else { + // There is no partial data in the send buffer, so stop the flush timer + stop_retransmit_timer(c); + } + } +} + +static void flush_unreliable_framed(struct utcp_connection *c) { + int32_t left = seqdiff(c->snd.last, c->snd.nxt); + + /* If the MSS dropped since last time ack_unreliable_frame() was called, + we might now have more than one segment worth of data left. + */ + if(left > c->utcp->mss) { + ack_unreliable_framed(c); + left = seqdiff(c->snd.last, c->snd.nxt); + assert(left <= c->utcp->mss); } - if(len <= 0) { - if(is_reliable(c)) { + if(left) { + struct { + struct hdr hdr; + uint8_t data[]; + } *pkt = c->utcp->pkt; + + pkt->hdr.src = c->src; + pkt->hdr.dst = c->dst; + pkt->hdr.seq = c->snd.nxt; + pkt->hdr.ack = c->rcv.nxt; + pkt->hdr.wnd = c->frame_offset; + pkt->hdr.ctl = ACK | MF; + pkt->hdr.aux = 0; + + uint32_t seglen = left; + + buffer_copy(&c->sndbuf, pkt->data, seqdiff(c->snd.nxt, c->snd.una), seglen); + buffer_discard(&c->sndbuf, seglen); + + c->snd.nxt += seglen; + c->snd.una = c->snd.nxt; + + print_packet(c, "send", pkt, sizeof(pkt->hdr) + seglen); + c->utcp->send(c->utcp, pkt, sizeof(pkt->hdr) + seglen); + } + + c->frame_offset = 0; + stop_retransmit_timer(c); +} + + +static ssize_t utcp_send_unreliable(struct utcp_connection *c, const void *data, size_t len) { + if(len > MAX_UNRELIABLE_SIZE) { + errno = EMSGSIZE; + return -1; + } + + size_t rlen = len + (is_framed(c) ? 2 : 0); + + if(rlen > buffer_free(&c->sndbuf)) { + if(rlen > c->sndbuf.maxsize) { + errno = EMSGSIZE; + return -1; + } else { errno = EWOULDBLOCK; return 0; - } else { - return len; } } - c->snd.last += len; - // Don't send anything yet if the connection has not fully established yet if(c->state == SYN_SENT || c->state == SYN_RECEIVED) { return len; } - ack(c, false); + if(is_framed(c)) { + uint16_t framelen = len; + buffer_put(&c->sndbuf, &framelen, sizeof(framelen)); + } - if(!is_reliable(c)) { + buffer_put(&c->sndbuf, data, len); + + c->snd.last += rlen; + + if(is_framed(c)) { + ack_unreliable_framed(c); + } else { + ack(c, false); c->snd.una = c->snd.nxt = c->snd.last; buffer_discard(&c->sndbuf, c->sndbuf.used); } - if(is_reliable(c) && !timespec_isset(&c->rtrx_timeout)) { - start_retransmit_timer(c); + return len; +} + +ssize_t utcp_send(struct utcp_connection *c, const void *data, size_t len) { + if(c->reapable) { + debug(c, "send() called on closed connection\n"); + errno = EBADF; + return -1; } - if(is_reliable(c) && !timespec_isset(&c->conn_timeout)) { - clock_gettime(UTCP_CLOCK, &c->conn_timeout); - c->conn_timeout.tv_sec += c->utcp->timeout; + switch(c->state) { + case CLOSED: + case LISTEN: + debug(c, "send() called on unconnected connection\n"); + errno = ENOTCONN; + return -1; + + case SYN_SENT: + case SYN_RECEIVED: + case ESTABLISHED: + case CLOSE_WAIT: + break; + + case FIN_WAIT_1: + case FIN_WAIT_2: + case CLOSING: + case LAST_ACK: + case TIME_WAIT: + debug(c, "send() called on closed connection\n"); + errno = EPIPE; + return -1; } - return len; + if(!data && len) { + errno = EFAULT; + return -1; + } + + if(is_reliable(c)) { + return utcp_send_reliable(c, data, len); + } else { + return utcp_send_unreliable(c, data, len); + } } static void swap_ports(struct hdr *hdr) { @@ -899,7 +1071,7 @@ static void retransmit(struct utcp_connection *c) { struct utcp *utcp = c->utcp; - if(utcp->retransmit) { + if(utcp->retransmit && is_reliable(c)) { utcp->retransmit(c); } @@ -942,6 +1114,11 @@ static void retransmit(struct utcp_connection *c) { case CLOSE_WAIT: case CLOSING: case LAST_ACK: + if(!is_reliable(c) && is_framed(c) && c->sndbuf.used) { + flush_unreliable_framed(c); + return; + } + // Send unacked data again. pkt->hdr.seq = c->snd.una; pkt->hdr.ack = c->rcv.nxt; @@ -1100,6 +1277,13 @@ static void handle_out_of_order(struct utcp_connection *c, uint32_t offset, cons } } +static void handle_out_of_order_framed(struct utcp_connection *c, uint32_t offset, const void *data, size_t len) { + uint32_t in_order_offset = (c->sacks[0].len && c->sacks[0].offset == 0) ? c->sacks[0].len : 0; + + // Put the data into the receive buffer + handle_out_of_order(c, offset + in_order_offset, data, len); +} + static void handle_in_order(struct utcp_connection *c, const void *data, size_t len) { if(c->recv) { ssize_t rxd = c->recv(c, data, len); @@ -1135,6 +1319,50 @@ static void handle_in_order(struct utcp_connection *c, const void *data, size_t c->rcv.nxt += len; } +static void handle_in_order_framed(struct utcp_connection *c, const void *data, size_t len) { + // Treat it as out of order, since it is unlikely the start of this packet contains the start of a frame. + uint32_t in_order_offset = (c->sacks[0].len && c->sacks[0].offset == 0) ? c->sacks[0].len : 0; + handle_out_of_order(c, in_order_offset, data, len); + + // While we have full frames at the start, give them to the application + while(c->sacks[0].len >= 2 && c->sacks[0].offset == 0) { + uint16_t framelen; + buffer_copy(&c->rcvbuf, &framelen, 0, sizeof(&framelen)); + + if(framelen > c->sacks[0].len - 2) { + break; + } + + if(c->recv) { + ssize_t rxd; + uint32_t realoffset = c->rcvbuf.offset + 2; + + if(c->rcvbuf.size - c->rcvbuf.offset <= 2) { + realoffset -= c->rcvbuf.size; + } + + if(realoffset > c->rcvbuf.size - framelen) { + // The buffer wraps, we need to copy + uint8_t buf[framelen]; + buffer_copy(&c->rcvbuf, buf, 2, framelen); + rxd = c->recv(c, buf, framelen); + } else { + // The frame is contiguous in the receive buffer + rxd = c->recv(c, c->rcvbuf.data + realoffset, framelen); + } + + if(rxd != (ssize_t)framelen) { + // TODO: handle the application not accepting all data. + abort(); + } + } + + sack_consume(c, framelen + 2); + } + + c->rcv.nxt += len; +} + static void handle_unreliable(struct utcp_connection *c, const struct hdr *hdr, const void *data, size_t len) { // Fast path for unfragmented packets if(!hdr->wnd && !(hdr->ctl & MF)) { @@ -1175,18 +1403,87 @@ static void handle_unreliable(struct utcp_connection *c, const struct hdr *hdr, c->rcv.nxt = hdr->seq + len; } +static void handle_unreliable_framed(struct utcp_connection *c, const struct hdr *hdr, const void *data, size_t len) { + bool in_order = hdr->seq == c->rcv.nxt; + c->rcv.nxt = hdr->seq + len; + + const uint8_t *ptr = data; + size_t left = len; + + // Does it start with a partial frame? + if(hdr->wnd) { + // Only accept the data if it is in order + if(in_order && c->rcvbuf.used) { + // In order, append it to the receive buffer + buffer_put(&c->rcvbuf, data, min(hdr->wnd, len)); + + if(hdr->wnd <= len) { + // We have a full frame + c->recv(c, (uint8_t *)c->rcvbuf.data + 2, c->rcvbuf.used - 2); + } + } + + // Exit early if there is other data in this frame + if(hdr->wnd > len) { + if(!in_order) { + buffer_clear(&c->rcvbuf); + } + + return; + } + + ptr += hdr->wnd; + left -= hdr->wnd; + } + + // We now start with new frames, so clear any data in the receive buffer + buffer_clear(&c->rcvbuf); + + // Handle whole frames + while(left > 2) { + uint16_t framelen; + memcpy(&framelen, ptr, sizeof(framelen)); + + if(left <= (size_t)framelen + 2) { + break; + } + + c->recv(c, ptr + 2, framelen); + ptr += framelen + 2; + left -= framelen + 2; + } + + // Handle partial last frame + if(left) { + buffer_put(&c->rcvbuf, ptr, left); + } +} + static void handle_incoming_data(struct utcp_connection *c, const struct hdr *hdr, const void *data, size_t len) { if(!is_reliable(c)) { - handle_unreliable(c, hdr, data, len); + if(is_framed(c)) { + handle_unreliable_framed(c, hdr, data, len); + } else { + handle_unreliable(c, hdr, data, len); + } + return; } uint32_t offset = seqdiff(hdr->seq, c->rcv.nxt); - if(offset) { - handle_out_of_order(c, offset, data, len); + if(is_framed(c)) { + if(offset) { + handle_out_of_order_framed(c, offset, data, len); + } else { + handle_in_order_framed(c, data, len); + } } else { - handle_in_order(c, data, len); + if(offset) { + handle_out_of_order(c, offset, data, len); + } else { + handle_in_order(c, data, len); + } } } @@ -1447,7 +1744,7 @@ synack: int32_t rcv_offset = seqdiff(hdr.seq, c->rcv.nxt); if(rcv_offset) { - debug(c, "packet out of order, offset %u bytes", rcv_offset); + debug(c, "packet out of order, offset %u bytes\n", rcv_offset); } #endif @@ -1938,6 +2235,10 @@ int utcp_shutdown(struct utcp_connection *c, int dir) { case SYN_RECEIVED: case ESTABLISHED: + if(!is_reliable(c) && is_framed(c)) { + flush_unreliable_framed(c); + } + set_state(c, FIN_WAIT_1); break; @@ -1957,7 +2258,7 @@ int utcp_shutdown(struct utcp_connection *c, int dir) { c->snd.last++; - ack(c, false); + ack(c, !is_reliable(c)); if(!timespec_isset(&c->rtrx_timeout)) { start_retransmit_timer(c); @@ -2052,8 +2353,10 @@ void utcp_abort_all_connections(struct utcp *utcp) { } int utcp_close(struct utcp_connection *c) { + debug(c, "closing\n"); + if(c->rcvbuf.used) { - fprintf(stderr, "UTCP channel closed with stuff in receive buffer\n"); + debug(c, "receive buffer not empty, resetting\n"); return reset_connection(c) ? 0 : -1; } @@ -2474,3 +2777,11 @@ void utcp_set_retransmit_cb(struct utcp *utcp, utcp_retransmit_t cb) { void utcp_set_clock_granularity(long granularity) { CLOCK_GRANULARITY = granularity; } + +int utcp_get_flush_timeout(struct utcp *utcp) { + return utcp->flush_timeout; +} + +void utcp_set_flush_timeout(struct utcp *utcp, int milliseconds) { + utcp->flush_timeout = milliseconds; +}