return c->flags & UTCP_RELIABLE;
}
+static bool is_framed(struct utcp_connection *c) {
+ return c->flags & UTCP_FRAMED;
+}
+
static int32_t seqdiff(uint32_t a, uint32_t b) {
return a - b;
}
uint32_t realoffset = buf->offset + offset;
- if(buf->size - buf->offset < offset) {
+ if(buf->size - buf->offset <= offset) {
// The offset wrapped
realoffset -= buf->size;
}
uint32_t realoffset = buf->offset + offset;
- if(buf->size - buf->offset < offset) {
+ if(buf->size - buf->offset <= offset) {
// The offset wrapped
realoffset -= buf->size;
}
}
// Copy data from the buffer without removing it.
-static ssize_t buffer_call(struct buffer *buf, utcp_recv_t cb, void *arg, size_t offset, size_t len) {
+static ssize_t buffer_call(struct utcp_connection *c, struct buffer *buf, size_t offset, size_t len) {
+ if(!c->recv) {
+ return len;
+ }
+
// Ensure we don't copy more than is actually stored in the buffer
if(offset >= buf->used) {
return 0;
uint32_t realoffset = buf->offset + offset;
- if(buf->size - buf->offset < offset) {
+ if(buf->size - buf->offset <= offset) {
// The offset wrapped
realoffset -= buf->size;
}
if(buf->size - realoffset < len) {
// The data is wrapped
- ssize_t rx1 = cb(arg, buf->data + realoffset, buf->size - realoffset);
+ ssize_t rx1 = c->recv(c, buf->data + realoffset, buf->size - realoffset);
if(rx1 < buf->size - realoffset) {
return rx1;
}
- ssize_t rx2 = cb(arg, buf->data, len - (buf->size - realoffset));
+ // The channel might have been closed by the previous callback
+ if(!c->recv) {
+ return len;
+ }
+
+ ssize_t rx2 = c->recv(c, buf->data, len - (buf->size - realoffset));
if(rx2 < 0) {
return rx2;
return rx1 + rx2;
}
} else {
- return cb(arg, buf->data + realoffset, len);
+ return c->recv(c, buf->data + realoffset, len);
}
}
len = buf->used;
}
- if(buf->size - buf->offset < len) {
+ if(buf->size - buf->offset <= len) {
buf->offset -= buf->size;
}
debug(c, "rtrx_timeout %ld.%06lu\n", c->rtrx_timeout.tv_sec, c->rtrx_timeout.tv_nsec);
}
+static void start_flush_timer(struct utcp_connection *c) {
+ clock_gettime(UTCP_CLOCK, &c->rtrx_timeout);
+
+ c->rtrx_timeout.tv_nsec += c->utcp->flush_timeout * 1000000;
+
+ if(c->rtrx_timeout.tv_nsec >= NSEC_PER_SEC) {
+ c->rtrx_timeout.tv_nsec -= NSEC_PER_SEC;
+ c->rtrx_timeout.tv_sec++;
+ }
+
+ debug(c, "rtrx_timeout %ld.%06lu (flush)\n", c->rtrx_timeout.tv_sec, c->rtrx_timeout.tv_nsec);
+}
+
static void stop_retransmit_timer(struct utcp_connection *c) {
timespec_clear(&c->rtrx_timeout);
debug(c, "rtrx_timeout cleared\n");
debug(c, "accepted %p %p\n", c, recv, priv);
c->recv = recv;
c->priv = priv;
+ c->do_poll = true;
set_state(c, ESTABLISHED);
}
pkt->hdr.ctl |= FIN;
}
- if(!c->rtt_start.tv_sec) {
+ if(!c->rtt_start.tv_sec && is_reliable(c)) {
// Start RTT measurement
clock_gettime(UTCP_CLOCK, &c->rtt_start);
c->rtt_seq = pkt->hdr.seq + seglen;
} while(left);
}
-ssize_t utcp_send(struct utcp_connection *c, const void *data, size_t len) {
- if(c->reapable) {
- debug(c, "send() called on closed connection\n");
- errno = EBADF;
- return -1;
+static ssize_t utcp_send_reliable(struct utcp_connection *c, const void *data, size_t len) {
+ size_t rlen = len + (is_framed(c) ? 2 : 0);
+
+ if(!rlen) {
+ return 0;
}
- switch(c->state) {
- case CLOSED:
- case LISTEN:
- debug(c, "send() called on unconnected connection\n");
- errno = ENOTCONN;
- return -1;
+ // Check if we need to be able to buffer all data
- case SYN_SENT:
- case SYN_RECEIVED:
- case ESTABLISHED:
- case CLOSE_WAIT:
- break;
+ if(c->flags & (UTCP_NO_PARTIAL | UTCP_FRAMED)) {
+ if(rlen > c->sndbuf.maxsize) {
+ errno = EMSGSIZE;
+ return -1;
+ }
- case FIN_WAIT_1:
- case FIN_WAIT_2:
- case CLOSING:
- case LAST_ACK:
- case TIME_WAIT:
- debug(c, "send() called on closed connection\n");
- errno = EPIPE;
- return -1;
+ if((c->flags & UTCP_FRAMED) && len > MAX_UNRELIABLE_SIZE) {
+ errno = EMSGSIZE;
+ return -1;
+ }
+
+ if(rlen > buffer_free(&c->sndbuf)) {
+ errno = EWOULDBLOCK;
+ return 0;
+ }
}
- // Exit early if we have nothing to send.
+ // Add data to the send buffer.
- if(!len) {
- return 0;
+ if(is_framed(c)) {
+ uint16_t len16 = len;
+ buffer_put(&c->sndbuf, &len16, sizeof(len16));
+ assert(buffer_put(&c->sndbuf, data, len) == (ssize_t)len);
+ } else {
+ len = buffer_put(&c->sndbuf, data, len);
+
+ if(len <= 0) {
+ errno = EWOULDBLOCK;
+ return 0;
+ }
}
- if(!data) {
- errno = EFAULT;
- return -1;
+ c->snd.last += rlen;
+
+ // Don't send anything yet if the connection has not fully established yet
+
+ if(c->state == SYN_SENT || c->state == SYN_RECEIVED) {
+ return len;
}
- // Check if we need to be able to buffer all data
+ ack(c, false);
- if(c->flags & UTCP_NO_PARTIAL) {
- if(len > buffer_free(&c->sndbuf)) {
- if(len > c->sndbuf.maxsize) {
- errno = EMSGSIZE;
- return -1;
- } else {
- errno = EWOULDBLOCK;
- return 0;
+ if(!timespec_isset(&c->rtrx_timeout)) {
+ start_retransmit_timer(c);
+ }
+
+ if(!timespec_isset(&c->conn_timeout)) {
+ clock_gettime(UTCP_CLOCK, &c->conn_timeout);
+ c->conn_timeout.tv_sec += c->utcp->timeout;
+ }
+
+ return len;
+}
+
+
+/* In the send buffer we can have multiple frames, each prefixed with their
+ length. However, the first frame might already have been partially sent. The
+ variable c->frame_offset tracks how much of a partial frame is left at the
+ start. If it is 0, it means there is no partial frame, and the first two
+ bytes in the send buffer are the length of the first frame.
+
+ After sending an MSS sized packet, we need to calculate the new frame_offset
+ value, since it is likely that the next packet will also have a partial frame
+ at the start. We do this by scanning the previously sent packet for frame
+ headers, to find out how many bytes of the last frame are left to send.
+*/
+static void ack_unreliable_framed(struct utcp_connection *c) {
+ int32_t left = seqdiff(c->snd.last, c->snd.nxt);
+ assert(left > 0);
+
+ struct {
+ struct hdr hdr;
+ uint8_t data[];
+ } *pkt = c->utcp->pkt;
+
+ pkt->hdr.src = c->src;
+ pkt->hdr.dst = c->dst;
+ pkt->hdr.ack = c->rcv.nxt;
+ pkt->hdr.ctl = ACK | MF;
+ pkt->hdr.aux = 0;
+
+ bool sent_packet = false;
+
+ while(left >= c->utcp->mss) {
+ pkt->hdr.wnd = c->frame_offset;
+ uint32_t seglen = c->utcp->mss;
+
+ pkt->hdr.seq = c->snd.nxt;
+
+ buffer_copy(&c->sndbuf, pkt->data, seqdiff(c->snd.nxt, c->snd.una), seglen);
+
+ c->snd.nxt += seglen;
+ c->snd.una = c->snd.nxt;
+ left -= seglen;
+
+ print_packet(c, "send", pkt, sizeof(pkt->hdr) + seglen);
+ c->utcp->send(c->utcp, pkt, sizeof(pkt->hdr) + seglen);
+ sent_packet = true;
+
+ // Calculate the new frame offset
+ while(c->frame_offset < seglen) {
+ uint16_t framelen;
+ buffer_copy(&c->sndbuf, &framelen, c->frame_offset, sizeof(framelen));
+ c->frame_offset += framelen + 2;
+ }
+
+ buffer_discard(&c->sndbuf, seglen);
+ c->frame_offset -= seglen;
+ };
+
+ if(sent_packet) {
+ if(left) {
+ // We sent one packet but we have partial data left, (re)start the flush timer
+ if(!timespec_isset(&c->rtrx_timeout)) {
+ c->flush_needed = true;
}
+
+ start_flush_timer(c);
+ } else {
+ // There is no partial data in the send buffer, so stop the flush timer
+ stop_retransmit_timer(c);
}
+ } else if(left && !timespec_isset(&c->rtrx_timeout)) {
+ // We have partial data and we didn't start the flush timer yet
+ c->flush_needed = true;
+ start_flush_timer(c);
}
+}
- // Add data to send buffer.
+static void flush_unreliable_framed(struct utcp_connection *c) {
+ int32_t left = seqdiff(c->snd.last, c->snd.nxt);
- if(is_reliable(c)) {
- len = buffer_put(&c->sndbuf, data, len);
- } else if(c->state != SYN_SENT && c->state != SYN_RECEIVED) {
- if(len > MAX_UNRELIABLE_SIZE || buffer_put(&c->sndbuf, data, len) != (ssize_t)len) {
- errno = EMSGSIZE;
- return -1;
- }
- } else {
- return 0;
+ /* If the MSS dropped since last time ack_unreliable_frame() was called,
+ we might now have more than one segment worth of data left.
+ */
+ if(left > c->utcp->mss) {
+ ack_unreliable_framed(c);
+ left = seqdiff(c->snd.last, c->snd.nxt);
+ assert(left <= c->utcp->mss);
}
- if(len <= 0) {
- if(is_reliable(c)) {
+ if(left) {
+ struct {
+ struct hdr hdr;
+ uint8_t data[];
+ } *pkt = c->utcp->pkt;
+
+ pkt->hdr.src = c->src;
+ pkt->hdr.dst = c->dst;
+ pkt->hdr.seq = c->snd.nxt;
+ pkt->hdr.ack = c->rcv.nxt;
+ pkt->hdr.wnd = c->frame_offset;
+ pkt->hdr.ctl = ACK | MF;
+ pkt->hdr.aux = 0;
+
+ uint32_t seglen = left;
+
+ buffer_copy(&c->sndbuf, pkt->data, seqdiff(c->snd.nxt, c->snd.una), seglen);
+ buffer_discard(&c->sndbuf, seglen);
+
+ c->snd.nxt += seglen;
+ c->snd.una = c->snd.nxt;
+
+ print_packet(c, "send", pkt, sizeof(pkt->hdr) + seglen);
+ c->utcp->send(c->utcp, pkt, sizeof(pkt->hdr) + seglen);
+ }
+
+ c->frame_offset = 0;
+ stop_retransmit_timer(c);
+}
+
+
+static ssize_t utcp_send_unreliable(struct utcp_connection *c, const void *data, size_t len) {
+ if(len > MAX_UNRELIABLE_SIZE) {
+ errno = EMSGSIZE;
+ return -1;
+ }
+
+ size_t rlen = len + (is_framed(c) ? 2 : 0);
+
+ if(rlen > buffer_free(&c->sndbuf)) {
+ if(rlen > c->sndbuf.maxsize) {
+ errno = EMSGSIZE;
+ return -1;
+ } else {
errno = EWOULDBLOCK;
return 0;
- } else {
- return len;
}
}
- c->snd.last += len;
-
// Don't send anything yet if the connection has not fully established yet
if(c->state == SYN_SENT || c->state == SYN_RECEIVED) {
return len;
}
- ack(c, false);
+ if(is_framed(c)) {
+ uint16_t framelen = len;
+ buffer_put(&c->sndbuf, &framelen, sizeof(framelen));
+ }
- if(!is_reliable(c)) {
+ buffer_put(&c->sndbuf, data, len);
+
+ c->snd.last += rlen;
+
+ if(is_framed(c)) {
+ ack_unreliable_framed(c);
+ } else {
+ ack(c, false);
c->snd.una = c->snd.nxt = c->snd.last;
buffer_discard(&c->sndbuf, c->sndbuf.used);
}
- if(is_reliable(c) && !timespec_isset(&c->rtrx_timeout)) {
- start_retransmit_timer(c);
+ return len;
+}
+
+ssize_t utcp_send(struct utcp_connection *c, const void *data, size_t len) {
+ if(c->reapable) {
+ debug(c, "send() called on closed connection\n");
+ errno = EBADF;
+ return -1;
}
- if(is_reliable(c) && !timespec_isset(&c->conn_timeout)) {
- clock_gettime(UTCP_CLOCK, &c->conn_timeout);
- c->conn_timeout.tv_sec += c->utcp->timeout;
+ switch(c->state) {
+ case CLOSED:
+ case LISTEN:
+ debug(c, "send() called on unconnected connection\n");
+ errno = ENOTCONN;
+ return -1;
+
+ case SYN_SENT:
+ case SYN_RECEIVED:
+ case ESTABLISHED:
+ case CLOSE_WAIT:
+ break;
+
+ case FIN_WAIT_1:
+ case FIN_WAIT_2:
+ case CLOSING:
+ case LAST_ACK:
+ case TIME_WAIT:
+ debug(c, "send() called on closed connection\n");
+ errno = EPIPE;
+ return -1;
}
- return len;
+ if(!data && len) {
+ errno = EFAULT;
+ return -1;
+ }
+
+ if(is_reliable(c)) {
+ return utcp_send_reliable(c, data, len);
+ } else {
+ return utcp_send_unreliable(c, data, len);
+ }
}
static void swap_ports(struct hdr *hdr) {
struct utcp *utcp = c->utcp;
- if(utcp->retransmit) {
+ if(utcp->retransmit && is_reliable(c)) {
utcp->retransmit(c);
}
case CLOSE_WAIT:
case CLOSING:
case LAST_ACK:
+ if(!is_reliable(c) && is_framed(c) && c->sndbuf.used) {
+ flush_unreliable_framed(c);
+ return;
+ }
+
// Send unacked data again.
pkt->hdr.seq = c->snd.una;
pkt->hdr.ack = c->rcv.nxt;
}
}
+static void handle_out_of_order_framed(struct utcp_connection *c, uint32_t offset, const void *data, size_t len) {
+ uint32_t in_order_offset = (c->sacks[0].len && c->sacks[0].offset == 0) ? c->sacks[0].len : 0;
+
+ // Put the data into the receive buffer
+ handle_out_of_order(c, offset + in_order_offset, data, len);
+}
+
static void handle_in_order(struct utcp_connection *c, const void *data, size_t len) {
if(c->recv) {
ssize_t rxd = c->recv(c, data, len);
len = c->sacks[0].offset + c->sacks[0].len;
size_t remainder = len - offset;
- if(c->recv) {
- ssize_t rxd = buffer_call(&c->rcvbuf, c->recv, c, offset, remainder);
+ ssize_t rxd = buffer_call(c, &c->rcvbuf, offset, remainder);
- if(rxd != (ssize_t)remainder) {
- // TODO: handle the application not accepting all data.
- abort();
- }
+ if(rxd != (ssize_t)remainder) {
+ // TODO: handle the application not accepting all data.
+ abort();
}
}
}
c->rcv.nxt += len;
}
+static void handle_in_order_framed(struct utcp_connection *c, const void *data, size_t len) {
+ // Treat it as out of order, since it is unlikely the start of this packet contains the start of a frame.
+ uint32_t in_order_offset = (c->sacks[0].len && c->sacks[0].offset == 0) ? c->sacks[0].len : 0;
+ handle_out_of_order(c, in_order_offset, data, len);
+
+ // While we have full frames at the start, give them to the application
+ while(c->sacks[0].len >= 2 && c->sacks[0].offset == 0) {
+ uint16_t framelen;
+ buffer_copy(&c->rcvbuf, &framelen, 0, sizeof(&framelen));
+
+ if(framelen > c->sacks[0].len - 2) {
+ break;
+ }
+
+ if(c->recv) {
+ ssize_t rxd;
+ uint32_t realoffset = c->rcvbuf.offset + 2;
+
+ if(c->rcvbuf.size - c->rcvbuf.offset <= 2) {
+ realoffset -= c->rcvbuf.size;
+ }
+
+ if(realoffset > c->rcvbuf.size - framelen) {
+ // The buffer wraps, we need to copy
+ uint8_t buf[framelen];
+ buffer_copy(&c->rcvbuf, buf, 2, framelen);
+ rxd = c->recv(c, buf, framelen);
+ } else {
+ // The frame is contiguous in the receive buffer
+ rxd = c->recv(c, c->rcvbuf.data + realoffset, framelen);
+ }
+
+ if(rxd != (ssize_t)framelen) {
+ // TODO: handle the application not accepting all data.
+ abort();
+ }
+ }
+
+ sack_consume(c, framelen + 2);
+ }
+
+ c->rcv.nxt += len;
+}
+
static void handle_unreliable(struct utcp_connection *c, const struct hdr *hdr, const void *data, size_t len) {
// Fast path for unfragmented packets
if(!hdr->wnd && !(hdr->ctl & MF)) {
}
// Ensure reassembled packet are not larger than 64 kiB
- if(hdr->wnd >= MAX_UNRELIABLE_SIZE || hdr->wnd + len > MAX_UNRELIABLE_SIZE) {
+ if(hdr->wnd > MAX_UNRELIABLE_SIZE || hdr->wnd + len > MAX_UNRELIABLE_SIZE) {
return;
}
}
// Send the packet if it's the final fragment
- if(!(hdr->ctl & MF) && c->recv) {
- buffer_call(&c->rcvbuf, c->recv, c, 0, hdr->wnd + len);
+ if(!(hdr->ctl & MF)) {
+ buffer_call(c, &c->rcvbuf, 0, hdr->wnd + len);
}
c->rcv.nxt = hdr->seq + len;
}
+static void handle_unreliable_framed(struct utcp_connection *c, const struct hdr *hdr, const void *data, size_t len) {
+ bool in_order = hdr->seq == c->rcv.nxt;
+ c->rcv.nxt = hdr->seq + len;
+
+ const uint8_t *ptr = data;
+ size_t left = len;
+
+ // Does it start with a partial frame?
+ if(hdr->wnd) {
+ // Only accept the data if it is in order
+ if(in_order && c->rcvbuf.used) {
+ // In order, append it to the receive buffer
+ buffer_put(&c->rcvbuf, data, min(hdr->wnd, len));
+
+ if(hdr->wnd <= len) {
+ // We have a full frame
+ c->recv(c, (uint8_t *)c->rcvbuf.data + 2, c->rcvbuf.used - 2);
+ }
+ }
+
+ // Exit early if there is other data in this frame
+ if(hdr->wnd > len) {
+ if(!in_order) {
+ buffer_clear(&c->rcvbuf);
+ }
+
+ return;
+ }
+
+ ptr += hdr->wnd;
+ left -= hdr->wnd;
+ }
+
+ // We now start with new frames, so clear any data in the receive buffer
+ buffer_clear(&c->rcvbuf);
+
+ // Handle whole frames
+ while(left >= 2) {
+ uint16_t framelen;
+ memcpy(&framelen, ptr, sizeof(framelen));
+
+ if(left < (size_t)framelen + 2) {
+ break;
+ }
+
+ c->recv(c, ptr + 2, framelen);
+ ptr += framelen + 2;
+ left -= framelen + 2;
+ }
+
+ // Handle partial last frame
+ if(left) {
+ buffer_put(&c->rcvbuf, ptr, left);
+ }
+}
+
static void handle_incoming_data(struct utcp_connection *c, const struct hdr *hdr, const void *data, size_t len) {
if(!is_reliable(c)) {
- handle_unreliable(c, hdr, data, len);
+ if(is_framed(c)) {
+ handle_unreliable_framed(c, hdr, data, len);
+ } else {
+ handle_unreliable(c, hdr, data, len);
+ }
+
return;
}
uint32_t offset = seqdiff(hdr->seq, c->rcv.nxt);
- if(offset) {
- handle_out_of_order(c, offset, data, len);
+ if(is_framed(c)) {
+ if(offset) {
+ handle_out_of_order_framed(c, offset, data, len);
+ } else {
+ handle_in_order_framed(c, data, len);
+ }
} else {
- handle_in_order(c, data, len);
+ if(offset) {
+ handle_out_of_order(c, offset, data, len);
+ } else {
+ handle_in_order(c, data, len);
+ }
}
}
int32_t rcv_offset = seqdiff(hdr.seq, c->rcv.nxt);
if(rcv_offset) {
- debug(c, "packet out of order, offset %u bytes", rcv_offset);
+ debug(c, "packet out of order, offset %u bytes\n", rcv_offset);
}
#endif
c->snd.last++;
set_state(c, FIN_WAIT_1);
} else {
+ c->do_poll = true;
set_state(c, ESTABLISHED);
}
case SYN_RECEIVED:
case ESTABLISHED:
+ if(!is_reliable(c) && is_framed(c)) {
+ flush_unreliable_framed(c);
+ }
+
set_state(c, FIN_WAIT_1);
break;
c->snd.last++;
- ack(c, false);
+ ack(c, !is_reliable(c));
if(!timespec_isset(&c->rtrx_timeout)) {
start_retransmit_timer(c);
}
int utcp_close(struct utcp_connection *c) {
+ debug(c, "closing\n");
+
+ if(c->rcvbuf.used) {
+ debug(c, "receive buffer not empty, resetting\n");
+ return reset_connection(c) ? 0 : -1;
+ }
+
if(utcp_shutdown(c, SHUT_RDWR) && errno != ENOTCONN) {
return -1;
}
if(c->poll) {
if((c->state == ESTABLISHED || c->state == CLOSE_WAIT) && c->do_poll) {
c->do_poll = false;
- uint32_t len = buffer_free(&c->sndbuf);
+ uint32_t len = is_framed(c) ? min(buffer_free(&c->sndbuf), MAX_UNRELIABLE_SIZE) : buffer_free(&c->sndbuf);
if(len) {
c->poll(c, len);
}
}
-void utcp_set_retransmit_cb(struct utcp *utcp, utcp_retransmit_t retransmit) {
- utcp->retransmit = retransmit;
+void utcp_set_retransmit_cb(struct utcp *utcp, utcp_retransmit_t cb) {
+ utcp->retransmit = cb;
}
void utcp_set_clock_granularity(long granularity) {
CLOCK_GRANULARITY = granularity;
}
+
+int utcp_get_flush_timeout(struct utcp *utcp) {
+ return utcp->flush_timeout;
+}
+
+void utcp_set_flush_timeout(struct utcp *utcp, int milliseconds) {
+ utcp->flush_timeout = milliseconds;
+}
+
+bool utcp_get_flush_needed(struct utcp_connection *c) {
+ bool value = c->flush_needed;
+ c->flush_needed = false;
+ return value;
+}