(r)->tv_sec = (a)->tv_sec - (b)->tv_sec;\
(r)->tv_usec = (a)->tv_usec - (b)->tv_usec;\
if((r)->tv_usec < 0)\
- (r)->tv_sec--, (r)->tv_usec += 1000000;\
+ (r)->tv_sec--, (r)->tv_usec += USEC_PER_SEC;\
} while (0)
#endif
-#ifndef max
-#define max(a, b) ((a) > (b) ? (a) : (b))
-#endif
+static inline size_t max(size_t a, size_t b) {
+ return a > b ? a : b;
+}
#ifdef UTCP_DEBUG
#include <stdarg.h>
static void print_packet(struct utcp *utcp, const char *dir, const void *pkt, size_t len) {
struct hdr hdr;
if(len < sizeof hdr) {
- debug("%p %s: short packet (%zu bytes)\n", utcp, dir, len);
+ debug("%p %s: short packet (%lu bytes)\n", utcp, dir, (unsigned long)len);
return;
}
memcpy(&hdr, pkt, sizeof hdr);
- fprintf (stderr, "%p %s: len=%zu, src=%u dst=%u seq=%u ack=%u wnd=%u ctl=", utcp, dir, len, hdr.src, hdr.dst, hdr.seq, hdr.ack, hdr.wnd);
+ debug("%p %s: len=%lu, src=%u dst=%u seq=%u ack=%u wnd=%u aux=%x ctl=", utcp, dir, (unsigned long)len, hdr.src, hdr.dst, hdr.seq, hdr.ack, hdr.wnd, hdr.aux);
if(hdr.ctl & SYN)
debug("SYN");
if(hdr.ctl & RST)
debug("ACK");
if(len > sizeof hdr) {
- debug(" data=");
- for(int i = sizeof hdr; i < len; i++) {
- const char *data = pkt;
- debug("%c", data[i] >= 32 ? data[i] : '.');
+ uint32_t datalen = len - sizeof hdr;
+ const uint8_t *data = (uint8_t *)pkt + sizeof hdr;
+ char str[datalen * 2 + 1];
+ char *p = str;
+
+ for(uint32_t i = 0; i < datalen; i++) {
+ *p++ = "0123456789ABCDEF"[data[i] >> 4];
+ *p++ = "0123456789ABCDEF"[data[i] & 15];
}
+ *p = 0;
+
+ debug(" data=%s", str);
}
debug("\n");
}
}
+static bool is_reliable(struct utcp_connection *c) {
+ return c->flags & UTCP_RELIABLE;
+}
+
static inline void list_connections(struct utcp *utcp) {
debug("%p has %d connections:\n", utcp, utcp->nconnections);
for(int i = 0; i < utcp->nconnections; i++)
// Store data into the buffer
static ssize_t buffer_put_at(struct buffer *buf, size_t offset, const void *data, size_t len) {
- if(buf->maxsize <= buf->used)
- return 0;
-
- debug("buffer_put_at %zu %zu %zu\n", buf->used, offset, len);
+ debug("buffer_put_at %lu %lu %lu\n", (unsigned long)buf->used, (unsigned long)offset, (unsigned long)len);
size_t required = offset + len;
if(required > buf->maxsize) {
if(offset >= buf->maxsize)
return 0;
- abort();
len = buf->maxsize - offset;
required = buf->maxsize;
}
} else {
do {
newsize *= 2;
- } while(newsize < buf->used + len);
+ } while(newsize < required);
}
if(newsize > buf->maxsize)
newsize = buf->maxsize;
memmove(cp, cp + 1, (utcp->nconnections - i - 1) * sizeof *cp);
utcp->nconnections--;
+ buffer_exit(&c->rcvbuf);
buffer_exit(&c->sndbuf);
free(c);
}
}
if(!buffer_init(&c->rcvbuf, DEFAULT_RCVBUFSIZE, DEFAULT_MAXRCVBUFSIZE)) {
+ buffer_exit(&c->sndbuf);
free(c);
return NULL;
}
debug("timeout cleared\n");
}
-struct utcp_connection *utcp_connect(struct utcp *utcp, uint16_t dst, utcp_recv_t recv, void *priv) {
+struct utcp_connection *utcp_connect_ex(struct utcp *utcp, uint16_t dst, utcp_recv_t recv, void *priv, uint32_t flags) {
struct utcp_connection *c = allocate_connection(utcp, 0, dst);
if(!c)
return NULL;
+ assert((flags & ~0xf) == 0);
+
+ c->flags = flags;
c->recv = recv;
c->priv = priv;
- struct hdr hdr;
-
- hdr.src = c->src;
- hdr.dst = c->dst;
- hdr.seq = c->snd.iss;
- hdr.ack = 0;
- hdr.wnd = c->rcv.wnd;
- hdr.ctl = SYN;
- hdr.aux = 0;
+ struct {
+ struct hdr hdr;
+ uint8_t init[4];
+ } pkt;
+
+ pkt.hdr.src = c->src;
+ pkt.hdr.dst = c->dst;
+ pkt.hdr.seq = c->snd.iss;
+ pkt.hdr.ack = 0;
+ pkt.hdr.wnd = c->rcv.wnd;
+ pkt.hdr.ctl = SYN;
+ pkt.hdr.aux = 0x0101;
+ pkt.init[0] = 1;
+ pkt.init[1] = 0;
+ pkt.init[2] = 0;
+ pkt.init[3] = flags & 0x7;
set_state(c, SYN_SENT);
- print_packet(utcp, "send", &hdr, sizeof hdr);
- utcp->send(utcp, &hdr, sizeof hdr);
+ print_packet(utcp, "send", &pkt, sizeof pkt);
+ utcp->send(utcp, &pkt, sizeof pkt);
gettimeofday(&c->conn_timeout, NULL);
c->conn_timeout.tv_sec += utcp->timeout;
+ start_retransmit_timer(c);
+
return c;
}
+struct utcp_connection *utcp_connect(struct utcp *utcp, uint16_t dst, utcp_recv_t recv, void *priv) {
+ return utcp_connect_ex(utcp, dst, recv, priv, UTCP_TCP);
+}
+
void utcp_accept(struct utcp_connection *c, utcp_recv_t recv, void *priv) {
if(c->reapable || c->state != SYN_RECEIVED) {
debug("Error: accept() called on invalid connection %p in state %s\n", c, strstate[c->state]);
struct {
struct hdr hdr;
- char data[];
+ uint8_t data[];
} *pkt;
pkt = malloc(sizeof pkt->hdr + c->utcp->mtu);
return -1;
}
- // Add data to send buffer
+ // Exit early if we have nothing to send.
if(!len)
return 0;
return -1;
}
+ // Add data to send buffer.
+
len = buffer_put(&c->sndbuf, data, len);
if(len <= 0) {
errno = EWOULDBLOCK;
c->snd.last += len;
ack(c, false);
- if(!timerisset(&c->rtrx_timeout))
+ if(!is_reliable(c)) {
+ c->snd.una = c->snd.nxt = c->snd.last;
+ buffer_get(&c->sndbuf, NULL, c->sndbuf.used);
+ }
+ if(is_reliable(c) && !timerisset(&c->rtrx_timeout))
start_retransmit_timer(c);
return len;
}
}
static void retransmit(struct utcp_connection *c) {
- if(c->state == CLOSED || c->snd.nxt == c->snd.una)
+ if(c->state == CLOSED || c->snd.last == c->snd.una) {
+ debug("Retransmit() called but nothing to retransmit!\n");
+ stop_retransmit_timer(c);
return;
+ }
struct utcp *utcp = c->utcp;
struct {
struct hdr hdr;
- char data[];
+ uint8_t data[];
} *pkt;
pkt = malloc(sizeof pkt->hdr + c->utcp->mtu);
pkt->hdr.src = c->src;
pkt->hdr.dst = c->dst;
+ pkt->hdr.wnd = c->rcv.wnd;
+ pkt->hdr.aux = 0;
switch(c->state) {
case SYN_SENT:
// Send our SYN again
pkt->hdr.seq = c->snd.iss;
pkt->hdr.ack = 0;
- pkt->hdr.wnd = c->rcv.wnd;
pkt->hdr.ctl = SYN;
- print_packet(c->utcp, "rtrx", pkt, sizeof pkt->hdr);
- utcp->send(utcp, pkt, sizeof pkt->hdr);
+ pkt->hdr.aux = 0x0101;
+ pkt->data[0] = 1;
+ pkt->data[1] = 0;
+ pkt->data[2] = 0;
+ pkt->data[3] = c->flags & 0x7;
+ print_packet(c->utcp, "rtrx", pkt, sizeof pkt->hdr + 4);
+ utcp->send(utcp, pkt, sizeof pkt->hdr + 4);
break;
case SYN_RECEIVED:
free(pkt);
}
-// Update receive buffer and SACK entries after consuming data.
+/* Update receive buffer and SACK entries after consuming data.
+ *
+ * Situation:
+ *
+ * |.....0000..1111111111.....22222......3333|
+ * |---------------^
+ *
+ * 0..3 represent the SACK entries. The ^ indicates up to which point we want
+ * to remove data from the receive buffer. The idea is to substract "len"
+ * from the offset of all the SACK entries, and then remove/cut down entries
+ * that are shifted to before the start of the receive buffer.
+ *
+ * There are three cases:
+ * - the SACK entry is after ^, in that case just change the offset.
+ * - the SACK entry starts before and ends after ^, so we have to
+ * change both its offset and size.
+ * - the SACK entry is completely before ^, in that case delete it.
+ */
static void sack_consume(struct utcp_connection *c, size_t len) {
- debug("sack_consume %zu\n", len);
- if(len > c->rcvbuf.used)
- abort();
+ debug("sack_consume %lu\n", (unsigned long)len);
+ if(len > c->rcvbuf.used) {
+ debug("All SACK entries consumed");
+ c->sacks[0].len = 0;
+ return;
+ }
buffer_get(&c->rcvbuf, NULL, len);
c->sacks[i].offset -= len;
i++;
} else if(len < c->sacks[i].offset + c->sacks[i].len) {
- c->sacks[i].offset = 0;
c->sacks[i].len -= len - c->sacks[i].offset;
+ c->sacks[i].offset = 0;
i++;
} else {
if(i < NSACKS - 1) {
memmove(&c->sacks[i], &c->sacks[i + 1], (NSACKS - 1 - i) * sizeof c->sacks[i]);
- c->sacks[i + 1].len = 0;
+ c->sacks[NSACKS - 1].len = 0;
} else {
c->sacks[i].len = 0;
break;
memmove(&c->sacks[i + 1], &c->sacks[i], (NSACKS - i - 1) * sizeof c->sacks[i]);
c->sacks[i].offset = offset;
c->sacks[i].len = rxd;
+ } else {
+ debug("SACK entries full, dropping packet\n");
}
break;
} else { // merge
static void handle_in_order(struct utcp_connection *c, const void *data, size_t len) {
// Check if we can process out-of-order data now.
if(c->sacks[0].len && len >= c->sacks[0].offset) { // TODO: handle overlap with second SACK
- debug("incoming packet len %zu connected with SACK at %u\n", len, c->sacks[0].offset);
+ debug("incoming packet len %lu connected with SACK at %u\n", (unsigned long)len, c->sacks[0].offset);
buffer_put_at(&c->rcvbuf, 0, data, len); // TODO: handle return value
len = max(len, c->sacks[0].offset + c->sacks[0].len);
data = c->rcvbuf.data;
static void handle_incoming_data(struct utcp_connection *c, uint32_t seq, const void *data, size_t len) {
+ if(!is_reliable(c)) {
+ c->recv(c, data, len);
+ c->rcv.nxt = seq + len;
+ return;
+ }
+
uint32_t offset = seqdiff(seq, c->rcv.nxt);
if(offset + len > c->rcvbuf.maxsize)
abort();
return -1;
}
+ // Check for auxiliary headers
+
+ const uint8_t *init = NULL;
+
+ uint16_t aux = hdr.aux;
+ while(aux) {
+ size_t auxlen = 4 * (aux >> 8) & 0xf;
+ uint8_t auxtype = aux & 0xff;
+
+ if(len < auxlen) {
+ errno = EBADMSG;
+ return -1;
+ }
+
+ switch(auxtype) {
+ case AUX_INIT:
+ if(!(hdr.ctl & SYN) || auxlen != 4) {
+ errno = EBADMSG;
+ return -1;
+ }
+ init = data;
+ break;
+ default:
+ errno = EBADMSG;
+ return -1;
+ }
+
+ len -= auxlen;
+ data += auxlen;
+
+ if(!(aux & 0x800))
+ break;
+
+ if(len < 2) {
+ errno = EBADMSG;
+ return -1;
+ }
+
+ memcpy(&aux, data, 2);
+ len -= 2;
+ data += 2;
+ }
+
// Try to match the packet to an existing connection
struct utcp_connection *c = find_connection(utcp, hdr.dst, hdr.src);
goto reset;
}
+ // Parse auxilliary information
+ if(init) {
+ if(init[0] < 1) {
+ len = 1;
+ goto reset;
+ }
+ c->flags = init[3] & 0x7;
+ } else {
+ c->flags = UTCP_TCP;
+ }
+
// Return SYN+ACK, go to SYN_RECEIVED state
c->snd.wnd = hdr.wnd;
c->rcv.irs = hdr.seq;
c->rcv.nxt = c->rcv.irs + 1;
set_state(c, SYN_RECEIVED);
- hdr.dst = c->dst;
- hdr.src = c->src;
- hdr.ack = c->rcv.irs + 1;
- hdr.seq = c->snd.iss;
- hdr.ctl = SYN | ACK;
- print_packet(c->utcp, "send", &hdr, sizeof hdr);
- utcp->send(utcp, &hdr, sizeof hdr);
+ struct {
+ struct hdr hdr;
+ uint8_t data[4];
+ } pkt;
+
+ pkt.hdr.src = c->src;
+ pkt.hdr.dst = c->dst;
+ pkt.hdr.ack = c->rcv.irs + 1;
+ pkt.hdr.seq = c->snd.iss;
+ pkt.hdr.wnd = c->rcv.wnd;
+ pkt.hdr.ctl = SYN | ACK;
+ if(init) {
+ pkt.hdr.aux = 0x0101;
+ pkt.data[0] = 1;
+ pkt.data[1] = 0;
+ pkt.data[2] = 0;
+ pkt.data[3] = c->flags & 0x7;
+ print_packet(c->utcp, "send", &pkt, sizeof hdr + 4);
+ utcp->send(utcp, &pkt, sizeof hdr + 4);
+ } else {
+ pkt.hdr.aux = 0;
+ print_packet(c->utcp, "send", &pkt, sizeof hdr);
+ utcp->send(utcp, &pkt, sizeof hdr);
+ }
} else {
// No, we don't want your packets, send a RST back
len = 1;
// In case this is for a CLOSED connection, ignore the packet.
// TODO: make it so incoming packets can never match a CLOSED connection.
- if(c->state == CLOSED)
+ if(c->state == CLOSED) {
+ debug("Got packet for closed connection\n");
return 0;
+ }
// It is for an existing connection.
if(c->state == SYN_SENT)
acceptable = true;
-
- // TODO: handle packets overlapping c->rcv.nxt.
-#if 1
- // Only use this when accepting out-of-order packets.
else if(len == 0)
acceptable = seqdiff(hdr.seq, c->rcv.nxt) >= 0;
- else
- acceptable = seqdiff(hdr.seq, c->rcv.nxt) >= 0 && seqdiff(hdr.seq, c->rcv.nxt) + len <= c->rcvbuf.maxsize;
-#else
- if(c->state != SYN_SENT)
- acceptable = hdr.seq == c->rcv.nxt;
-#endif
+ else {
+ int32_t rcv_offset = seqdiff(hdr.seq, c->rcv.nxt);
+
+ // cut already accepted front overlapping
+ if(rcv_offset < 0) {
+ acceptable = len > -rcv_offset;
+ if(acceptable) {
+ data -= rcv_offset;
+ len += rcv_offset;
+ hdr.seq -= rcv_offset;
+ }
+ } else {
+ acceptable = seqdiff(hdr.seq, c->rcv.nxt) >= 0 && seqdiff(hdr.seq, c->rcv.nxt) + len <= c->rcvbuf.maxsize;
+ }
+ }
if(!acceptable) {
- debug("Packet not acceptable, %u <= %u + %zu < %u\n", c->rcv.nxt, hdr.seq, len, c->rcv.nxt + c->rcvbuf.maxsize);
+ debug("Packet not acceptable, %u <= %u + %lu < %u\n", c->rcv.nxt, hdr.seq, (unsigned long)len, c->rcv.nxt + c->rcvbuf.maxsize);
// Ignore unacceptable RST packets.
if(hdr.ctl & RST)
return 0;
- // Otherwise, send an ACK back in the hope things improve.
- ack(c, true);
- return 0;
+ // Otherwise, continue processing.
+ len = 0;
}
c->snd.wnd = hdr.wnd; // TODO: move below
}
}
+ if(!(hdr.ctl & ACK))
+ goto skip_ack;
+
// 3. Advance snd.una
uint32_t advanced = seqdiff(hdr.ack, c->snd.una);
break;
}
} else {
- if(!len) {
+ if(!len && is_reliable(c)) {
c->dupack++;
if(c->dupack == 3) {
debug("Triplicate ACK\n");
//Reset the congestion window so we wait for ACKs.
c->snd.nxt = c->snd.una;
c->snd.cwnd = utcp->mtu;
+ start_retransmit_timer(c);
}
}
}
timerclear(&c->conn_timeout); // It will be set anew in utcp_timeout() if c->snd.una != c->snd.nxt.
if(c->snd.una == c->snd.last)
stop_retransmit_timer(c);
- else
+ else if(is_reliable(c))
start_retransmit_timer(c);
}
+skip_ack:
// 5. Process SYN stuff
if(hdr.ctl & SYN) {
case LAST_ACK:
case TIME_WAIT:
// Ehm, no. We should never receive a second SYN.
- goto reset;
+ return 0;
default:
#ifdef UTCP_DEBUG
abort();
// - or we got an ack, so we should maybe send a bit more data
// -> sendatleastone = false
- ack(c, prevrcvnxt != c->rcv.nxt);
+ ack(c, len || prevrcvnxt != c->rcv.nxt);
return 0;
reset:
swap_ports(&hdr);
hdr.wnd = 0;
+ hdr.aux = 0;
if(hdr.ctl & ACK) {
hdr.seq = hdr.ack;
hdr.ctl = RST;
c->snd.last++;
ack(c, false);
+ if(!timerisset(&c->rtrx_timeout))
+ start_retransmit_timer(c);
return 0;
}
int utcp_close(struct utcp_connection *c) {
- if(utcp_shutdown(c, SHUT_RDWR))
+ if(utcp_shutdown(c, SHUT_RDWR) && errno != ENOTCONN)
return -1;
c->recv = NULL;
c->poll = NULL;
retransmit(c);
}
- if(c->poll && buffer_free(&c->sndbuf) && (c->state == ESTABLISHED || c->state == CLOSE_WAIT))
- c->poll(c, buffer_free(&c->sndbuf));
+ if(c->poll) {
+ if((c->state == ESTABLISHED || c->state == CLOSE_WAIT)) {
+ uint32_t len = buffer_free(&c->sndbuf);
+ if(len)
+ c->poll(c, len);
+ } else if(c->state == CLOSED) {
+ c->poll(c, 0);
+ }
+ }
if(timerisset(&c->conn_timeout) && timercmp(&c->conn_timeout, &next, <))
next = c->conn_timeout;
utcp->send = send;
utcp->priv = priv;
utcp->mtu = DEFAULT_MTU;
- utcp->timeout = DEFAULT_USER_TIMEOUT; // s
- utcp->rto = START_RTO; // us
+ utcp->timeout = DEFAULT_USER_TIMEOUT; // sec
+ utcp->rto = START_RTO; // usec
return utcp;
}
if(!utcp)
return;
for(int i = 0; i < utcp->nconnections; i++) {
- if(!utcp->connections[i]->reapable)
- debug("Warning, freeing unclosed connection %p\n", utcp->connections[i]);
- buffer_exit(&utcp->connections[i]->sndbuf);
- free(utcp->connections[i]);
+ struct utcp_connection *c = utcp->connections[i];
+ if(!c->reapable)
+ if(c->recv)
+ c->recv(c, NULL, 0);
+ buffer_exit(&c->rcvbuf);
+ buffer_exit(&c->sndbuf);
+ free(c);
}
free(utcp->connections);
free(utcp);
utcp->mtu = mtu;
}
+void utcp_reset_timers(struct utcp *utcp) {
+ if(!utcp)
+ return;
+ struct timeval now, then;
+ gettimeofday(&now, NULL);
+ then = now;
+ then.tv_sec += utcp->timeout;
+ for(int i = 0; i < utcp->nconnections; i++) {
+ utcp->connections[i]->rtrx_timeout = now;
+ utcp->connections[i]->conn_timeout = then;
+ utcp->connections[i]->rtt_start.tv_sec = 0;
+ }
+ if(utcp->rto > START_RTO)
+ utcp->rto = START_RTO;
+}
+
int utcp_get_user_timeout(struct utcp *u) {
return u ? u->timeout : 0;
}