From 976c4e1591e60ba89e7dcc32b4a8106e7d4156e5 Mon Sep 17 00:00:00 2001 From: Guus Sliepen Date: Mon, 11 May 2020 19:52:00 +0200 Subject: [PATCH] Move UTCP into the MeshLink repository. UTCP is not used outside of MeshLink at the moment, and there is a tight coupling between the two, so it makes more sense to have it as part of MeshLink itself. --- .gitmodules | 3 - src/.gitignore | 4 +- src/Makefile.am | 9 +- src/node.h | 2 +- src/utcp | 1 - src/utcp-test.c | 390 ++++++ src/utcp.c | 2464 ++++++++++++++++++++++++++++++++++++ src/utcp.h | 125 ++ src/utcp_priv.h | 201 +++ test/Makefile.am | 5 +- test/stream.c | 156 +++ test/utcp-benchmark | 80 ++ test/utcp-benchmark-stream | 85 ++ 13 files changed, 3514 insertions(+), 11 deletions(-) delete mode 160000 src/utcp create mode 100644 src/utcp-test.c create mode 100644 src/utcp.c create mode 100644 src/utcp.h create mode 100644 src/utcp_priv.h create mode 100644 test/stream.c create mode 100755 test/utcp-benchmark create mode 100755 test/utcp-benchmark-stream diff --git a/.gitmodules b/.gitmodules index 3eaea0bd..67adce0b 100644 --- a/.gitmodules +++ b/.gitmodules @@ -2,6 +2,3 @@ path = catta url = ../catta branch = develop -[submodule "src/utcp"] - path = src/utcp - url = ../utcp diff --git a/src/.gitignore b/src/.gitignore index c58c69b5..ae2b1916 100644 --- a/src/.gitignore +++ b/src/.gitignore @@ -1,3 +1 @@ -sptps_keypair -sptps_speed -sptps_test +utcp-test diff --git a/src/Makefile.am b/src/Makefile.am index d855d3a5..7d2e94e7 100644 --- a/src/Makefile.am +++ b/src/Makefile.am @@ -26,10 +26,11 @@ chacha_poly1305_SOURCES = \ chacha-poly1305/poly1305.c chacha-poly1305/poly1305.h utcp_SOURCES = \ - utcp/utcp.c utcp/utcp.h \ - utcp/utcp_priv.h + utcp.c utcp.h \ + utcp_priv.h lib_LTLIBRARIES = libmeshlink.la +noinst_PROGRAMS = utcp-test pkginclude_HEADERS = meshlink++.h meshlink.h @@ -82,6 +83,10 @@ libmeshlink_la_SOURCES = \ $(chacha_poly1305_SOURCES) \ $(utcp_SOURCES) +utcp_test_SOURCES = \ + utcp-test.c \ + $(utcp_SOURCES) + EXTRA_libmeshlink_la_DEPENDENCIES = $(srcdir)/meshlink.sym libmeshlink_la_CFLAGS = $(PTHREAD_CFLAGS) -fPIC -iquote. diff --git a/src/node.h b/src/node.h index 733abb00..4c3f2afe 100644 --- a/src/node.h +++ b/src/node.h @@ -23,7 +23,7 @@ #include "event.h" #include "sockaddr.h" #include "sptps.h" -#include "utcp/utcp.h" +#include "utcp.h" #include "submesh.h" typedef struct node_status_t { diff --git a/src/utcp b/src/utcp deleted file mode 160000 index 4f634572..00000000 --- a/src/utcp +++ /dev/null @@ -1 +0,0 @@ -Subproject commit 4f6345726724732b564dd55b20ecb3e20a127bd0 diff --git a/src/utcp-test.c b/src/utcp-test.c new file mode 100644 index 00000000..2a273de7 --- /dev/null +++ b/src/utcp-test.c @@ -0,0 +1,390 @@ +#define _GNU_SOURCE + +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + + +#include "utcp.h" + +#define DIR_READ 1 +#define DIR_WRITE 2 + +static struct utcp_connection *c; +static int dir = DIR_READ | DIR_WRITE; +static long inpktno; +static long outpktno; +static long dropfrom; +static long dropto; +static double reorder; +static long reorder_dist = 10; +static double dropin; +static double dropout; +static long total_out; +static long total_in; +static FILE *reference; +static long mtu; +static long bufsize; + +static char *reorder_data; +static size_t reorder_len; +static int reorder_countdown; + +#if UTCP_DEBUG +static void debug(const char *format, ...) { + struct timespec tv; + char buf[1024]; + int len; + + clock_gettime(CLOCK_REALTIME, &tv); + len = snprintf(buf, sizeof(buf), "%ld.%06lu ", (long)tv.tv_sec, tv.tv_nsec / 1000); + va_list ap; + va_start(ap, format); + len += vsnprintf(buf + len, sizeof(buf) - len, format, ap); + va_end(ap); + + if(len > 0 && (size_t)len < sizeof(buf)) { + fwrite(buf, len, 1, stderr); + } +} +#else +#define debug(...) do {} while(0) +#endif + +static ssize_t do_recv(struct utcp_connection *c, const void *data, size_t len) { + (void)c; + + if(!data || !len) { + if(errno) { + debug("Error: %s\n", strerror(errno)); + dir = 0; + } else { + dir &= ~DIR_WRITE; + debug("Connection closed by peer\n"); + } + + return -1; + } + + if(reference) { + char buf[len]; + + if(fread(buf, len, 1, reference) != 1) { + debug("Error reading reference\n"); + abort(); + } + + if(memcmp(buf, data, len)) { + debug("Received data differs from reference\n"); + abort(); + } + } + + return write(1, data, len); +} + +static void do_accept(struct utcp_connection *nc, uint16_t port) { + (void)port; + utcp_accept(nc, do_recv, NULL); + c = nc; + + if(bufsize) { + utcp_set_sndbuf(c, bufsize); + utcp_set_rcvbuf(c, bufsize); + } + + utcp_set_accept_cb(c->utcp, NULL, NULL); +} + +static ssize_t do_send(struct utcp *utcp, const void *data, size_t len) { + int s = *(int *)utcp->priv; + outpktno++; + + if(outpktno >= dropfrom && outpktno < dropto) { + if(drand48() < dropout) { + debug("Dropped outgoing packet\n"); + return len; + } + + if(!reorder_data && drand48() < reorder) { + reorder_data = malloc(len); + + if(!reorder_data) { + debug("Out of memory\n"); + return len; + } + + reorder_len = len; + memcpy(reorder_data, data, len); + reorder_countdown = 1 + drand48() * reorder_dist; + return len; + } + } + + if(reorder_data) { + if(--reorder_countdown < 0) { + total_out += reorder_len; + send(s, reorder_data, reorder_len, MSG_DONTWAIT); + free(reorder_data); + reorder_data = NULL; + } + } + + total_out += len; + ssize_t result = send(s, data, len, MSG_DONTWAIT); + + if(result <= 0) { + debug("Error sending UDP packet: %s\n", strerror(errno)); + } + + return result; +} + +static void set_mtu(struct utcp *u, int s) { + if(!mtu) { + socklen_t optlen = sizeof(mtu); + getsockopt(s, IPPROTO_IP, IP_MTU, &mtu, &optlen); + } + + if(!mtu || mtu == 65535) { + mtu = 1500; + } + + debug("Using MTU %lu\n", mtu); + + utcp_set_mtu(u, mtu ? mtu - 28 : 1300); +} + +int main(int argc, char *argv[]) { + srand(time(NULL)); + srand48(time(NULL)); + + if(argc < 2 || argc > 3) { + return 1; + } + + bool server = argc == 2; + bool connected = false; + uint32_t flags = UTCP_TCP; + size_t read_size = 102400; + + if(getenv("DROPIN")) { + dropin = atof(getenv("DROPIN")); + } + + if(getenv("DROPOUT")) { + dropout = atof(getenv("DROPOUT")); + } + + if(getenv("DROPFROM")) { + dropfrom = atoi(getenv("DROPFROM")); + } + + if(getenv("DROPTO")) { + dropto = atoi(getenv("DROPTO")); + } + + if(getenv("REORDER")) { + reorder = atof(getenv("REORDER")); + } + + if(getenv("REORDER_DIST")) { + reorder_dist = atoi(getenv("REORDER_DIST")); + } + + if(getenv("FLAGS")) { + flags = atoi(getenv("FLAGS")); + } + + if(getenv("READ_SIZE")) { + read_size = atoi(getenv("READ_SIZE")); + } + + if(getenv("MTU")) { + mtu = atoi(getenv("MTU")); + } + + if(getenv("BUFSIZE")) { + bufsize = atoi(getenv("BUFSIZE")); + } + + char *reference_filename = getenv("REFERENCE"); + + if(reference_filename) { + reference = fopen(reference_filename, "r"); + } + + if(dropto < dropfrom) { + dropto = 1 << 30; + } + + struct addrinfo *ai; + + struct addrinfo hint = { + .ai_flags = server ? AI_PASSIVE : 0, + .ai_socktype = SOCK_DGRAM, + }; + + getaddrinfo(server ? NULL : argv[1], server ? argv[1] : argv[2], &hint, &ai); + + if(!ai) { + debug("Could not lookup address: %s\n", strerror(errno)); + return 1; + } + + int s = socket(ai->ai_family, ai->ai_socktype, ai->ai_protocol); + + if(s == -1) { + debug("Could not create socket: %s\n", strerror(errno)); + return 1; + } + + static const int one = 1; + setsockopt(s, SOL_SOCKET, SO_REUSEADDR, &one, sizeof one); + + if(server) { + if(bind(s, ai->ai_addr, ai->ai_addrlen)) { + debug("Could not bind: %s\n", strerror(errno)); + return 1; + } + } else { + if(connect(s, ai->ai_addr, ai->ai_addrlen)) { + debug("Could not connect: %s\n", strerror(errno)); + return 1; + } + + connected = true; + } + + freeaddrinfo(ai); + + struct utcp *u = utcp_init(server ? do_accept : NULL, NULL, do_send, &s); + + if(!u) { + debug("Could not initialize UTCP\n"); + return 1; + } + + utcp_set_user_timeout(u, 10); + + if(!server) { + set_mtu(u, s); + c = utcp_connect_ex(u, 1, do_recv, NULL, flags); + + if(bufsize) { + utcp_set_sndbuf(c, bufsize); + utcp_set_rcvbuf(c, bufsize); + } + } + + struct pollfd fds[2] = { + {.fd = 0, .events = POLLIN | POLLERR | POLLHUP}, + {.fd = s, .events = POLLIN | POLLERR | POLLHUP}, + }; + + char buf[102400]; + + struct timespec timeout = utcp_timeout(u); + + while(!connected || utcp_is_active(u)) { + size_t max = c ? utcp_get_sndbuf_free(c) : 0; + + if(max > sizeof(buf)) { + max = sizeof(buf); + } + + if(max > read_size) { + max = read_size; + } + + int timeout_ms = timeout.tv_sec * 1000 + timeout.tv_nsec / 1000000 + 1; + + debug("polling, dir = %d, timeout = %d\n", dir, timeout_ms); + + if((dir & DIR_READ) && max) { + poll(fds, 2, timeout_ms); + } else { + poll(fds + 1, 1, timeout_ms); + } + + if(fds[0].revents) { + fds[0].revents = 0; + ssize_t len = read(0, buf, max); + debug("stdin %zd\n", len); + + if(len <= 0) { + fds[0].fd = -1; + dir &= ~DIR_READ; + + if(c) { + utcp_shutdown(c, SHUT_WR); + } + + if(len == -1) { + break; + } else { + continue; + } + } + + if(c) { + ssize_t sent = utcp_send(c, buf, len); + + if(sent != len) { + debug("Short send: %zd != %zd\n", sent, len); + } + } + } + + if(fds[1].revents) { + fds[1].revents = 0; + struct sockaddr_storage ss; + socklen_t sl = sizeof(ss); + int len = recvfrom(s, buf, sizeof(buf), MSG_DONTWAIT, (struct sockaddr *)&ss, &sl); + debug("netin %zu\n", len); + + if(len <= 0) { + debug("Error receiving UDP packet: %s\n", strerror(errno)); + break; + } + + if(!connected) { + if(!connect(s, (struct sockaddr *)&ss, sl)) { + connected = true; + set_mtu(u, s); + } + } + + inpktno++; + + if(inpktno >= dropto || inpktno < dropfrom || drand48() >= dropin) { + total_in += len; + + if(utcp_recv(u, buf, len) == -1) { + debug("Error receiving UTCP packet: %s\n", strerror(errno)); + } + } else { + debug("Dropped incoming packet\n"); + } + } + + timeout = utcp_timeout(u); + }; + + utcp_close(c); + + utcp_exit(u); + + free(reorder_data); + + debug("Total bytes in: %ld, out: %ld\n", total_in, total_out); + + return 0; +} diff --git a/src/utcp.c b/src/utcp.c new file mode 100644 index 00000000..47acf221 --- /dev/null +++ b/src/utcp.c @@ -0,0 +1,2464 @@ +/* + utcp.c -- Userspace TCP + Copyright (C) 2014-2017 Guus Sliepen + + This program is free software; you can redistribute it and/or modify + it under the terms of the GNU General Public License as published by + the Free Software Foundation; either version 2 of the License, or + (at your option) any later version. + + This program is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU General Public License for more details. + + You should have received a copy of the GNU General Public License along + with this program; if not, write to the Free Software Foundation, Inc., + 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA. +*/ + +#define _GNU_SOURCE + +#include +#include +#include +#include +#include +#include +#include +#include +#include + +#include "utcp_priv.h" + +#ifndef EBADMSG +#define EBADMSG 104 +#endif + +#ifndef SHUT_RDWR +#define SHUT_RDWR 2 +#endif + +#ifdef poll +#undef poll +#endif + +#ifndef UTCP_CLOCK +#if defined(CLOCK_MONOTONIC_RAW) && defined(__x86_64__) +#define UTCP_CLOCK CLOCK_MONOTONIC_RAW +#else +#define UTCP_CLOCK CLOCK_MONOTONIC +#endif +#endif + +static void timespec_sub(const struct timespec *a, const struct timespec *b, struct timespec *r) { + r->tv_sec = a->tv_sec - b->tv_sec; + r->tv_nsec = a->tv_nsec - b->tv_nsec; + + if(r->tv_nsec < 0) { + r->tv_sec--, r->tv_nsec += NSEC_PER_SEC; + } +} + +static int32_t timespec_diff_usec(const struct timespec *a, const struct timespec *b) { + return (a->tv_sec - b->tv_sec) * 1000000 + (a->tv_nsec - b->tv_nsec) / 1000; +} + +static bool timespec_lt(const struct timespec *a, const struct timespec *b) { + if(a->tv_sec == b->tv_sec) { + return a->tv_nsec < b->tv_nsec; + } else { + return a->tv_sec < b->tv_sec; + } +} + +static void timespec_clear(struct timespec *a) { + a->tv_sec = 0; + a->tv_nsec = 0; +} + +static bool timespec_isset(const struct timespec *a) { + return a->tv_sec; +} + +static long CLOCK_GRANULARITY; // usec + +static inline size_t min(size_t a, size_t b) { + return a < b ? a : b; +} + +static inline size_t max(size_t a, size_t b) { + return a > b ? a : b; +} + +#ifdef UTCP_DEBUG +#include + +#ifndef UTCP_DEBUG_DATALEN +#define UTCP_DEBUG_DATALEN 20 +#endif + +static void debug(struct utcp_connection *c, const char *format, ...) { + struct timespec tv; + char buf[1024]; + int len; + + clock_gettime(CLOCK_REALTIME, &tv); + len = snprintf(buf, sizeof(buf), "%ld.%06lu %u:%u ", (long)tv.tv_sec, tv.tv_nsec / 1000, c ? c->src : 0, c ? c->dst : 0); + va_list ap; + va_start(ap, format); + len += vsnprintf(buf + len, sizeof(buf) - len, format, ap); + va_end(ap); + + if(len > 0 && (size_t)len < sizeof(buf)) { + fwrite(buf, len, 1, stderr); + } +} + +static void print_packet(struct utcp_connection *c, const char *dir, const void *pkt, size_t len) { + struct hdr hdr; + + if(len < sizeof(hdr)) { + debug(c, "%s: short packet (%lu bytes)\n", dir, (unsigned long)len); + return; + } + + memcpy(&hdr, pkt, sizeof(hdr)); + + uint32_t datalen; + + if(len > sizeof(hdr)) { + datalen = min(len - sizeof(hdr), UTCP_DEBUG_DATALEN); + } else { + datalen = 0; + } + + + const uint8_t *data = (uint8_t *)pkt + sizeof(hdr); + char str[datalen * 2 + 1]; + char *p = str; + + for(uint32_t i = 0; i < datalen; i++) { + *p++ = "0123456789ABCDEF"[data[i] >> 4]; + *p++ = "0123456789ABCDEF"[data[i] & 15]; + } + + *p = 0; + + debug(c, "%s: len %lu src %u dst %u seq %u ack %u wnd %u aux %x ctl %s%s%s%s%s data %s\n", + dir, (unsigned long)len, hdr.src, hdr.dst, hdr.seq, hdr.ack, hdr.wnd, hdr.aux, + hdr.ctl & SYN ? "SYN" : "", + hdr.ctl & RST ? "RST" : "", + hdr.ctl & FIN ? "FIN" : "", + hdr.ctl & ACK ? "ACK" : "", + hdr.ctl & MF ? "MF" : "", + str + ); +} + +static void debug_cwnd(struct utcp_connection *c) { + debug(c, "snd.cwnd %u snd.ssthresh %u\n", c->snd.cwnd, ~c->snd.ssthresh ? c->snd.ssthresh : 0); +} +#else +#define debug(...) do {} while(0) +#define print_packet(...) do {} while(0) +#define debug_cwnd(...) do {} while(0) +#endif + +static void set_state(struct utcp_connection *c, enum state state) { + c->state = state; + + if(state == ESTABLISHED) { + timespec_clear(&c->conn_timeout); + } + + debug(c, "state %s\n", strstate[state]); +} + +static bool fin_wanted(struct utcp_connection *c, uint32_t seq) { + if(seq != c->snd.last) { + return false; + } + + switch(c->state) { + case FIN_WAIT_1: + case CLOSING: + case LAST_ACK: + return true; + + default: + return false; + } +} + +static bool is_reliable(struct utcp_connection *c) { + return c->flags & UTCP_RELIABLE; +} + +static int32_t seqdiff(uint32_t a, uint32_t b) { + return a - b; +} + +// Buffer functions +static bool buffer_wraps(struct buffer *buf) { + return buf->size - buf->offset < buf->used; +} + +static bool buffer_resize(struct buffer *buf, uint32_t newsize) { + char *newdata = realloc(buf->data, newsize); + + if(!newdata) { + return false; + } + + buf->data = newdata; + + if(buffer_wraps(buf)) { + // Shift the right part of the buffer until it hits the end of the new buffer. + // Old situation: + // [345......012] + // New situation: + // [345.........|........012] + uint32_t tailsize = buf->size - buf->offset; + uint32_t newoffset = newsize - tailsize; + memmove(buf->data + newoffset, buf->data + buf->offset, tailsize); + buf->offset = newoffset; + } + + buf->size = newsize; + return true; +} + +// Store data into the buffer +static ssize_t buffer_put_at(struct buffer *buf, size_t offset, const void *data, size_t len) { + debug(NULL, "buffer_put_at %lu %lu %lu\n", (unsigned long)buf->used, (unsigned long)offset, (unsigned long)len); + + // Ensure we don't store more than maxsize bytes in total + size_t required = offset + len; + + if(required > buf->maxsize) { + if(offset >= buf->maxsize) { + return 0; + } + + len = buf->maxsize - offset; + required = buf->maxsize; + } + + // Check if we need to resize the buffer + if(required > buf->size) { + size_t newsize = buf->size; + + if(!newsize) { + newsize = 4096; + } + + do { + newsize *= 2; + } while(newsize < required); + + if(newsize > buf->maxsize) { + newsize = buf->maxsize; + } + + if(!buffer_resize(buf, newsize)) { + return -1; + } + } + + uint32_t realoffset = buf->offset + offset; + + if(buf->size - buf->offset < offset) { + // The offset wrapped + realoffset -= buf->size; + } + + if(buf->size - realoffset < len) { + // The new chunk of data must be wrapped + memcpy(buf->data + realoffset, data, buf->size - realoffset); + memcpy(buf->data, (char *)data + buf->size - realoffset, len - (buf->size - realoffset)); + } else { + memcpy(buf->data + realoffset, data, len); + } + + if(required > buf->used) { + buf->used = required; + } + + return len; +} + +static ssize_t buffer_put(struct buffer *buf, const void *data, size_t len) { + return buffer_put_at(buf, buf->used, data, len); +} + +// Copy data from the buffer without removing it. +static ssize_t buffer_copy(struct buffer *buf, void *data, size_t offset, size_t len) { + // Ensure we don't copy more than is actually stored in the buffer + if(offset >= buf->used) { + return 0; + } + + if(buf->used - offset < len) { + len = buf->used - offset; + } + + uint32_t realoffset = buf->offset + offset; + + if(buf->size - buf->offset < offset) { + // The offset wrapped + realoffset -= buf->size; + } + + if(buf->size - realoffset < len) { + // The data is wrapped + memcpy(data, buf->data + realoffset, buf->size - realoffset); + memcpy((char *)data + buf->size - realoffset, buf->data, len - (buf->size - realoffset)); + } else { + memcpy(data, buf->data + realoffset, len); + } + + return len; +} + +// Copy data from the buffer without removing it. +static ssize_t buffer_call(struct buffer *buf, utcp_recv_t cb, void *arg, size_t offset, size_t len) { + // Ensure we don't copy more than is actually stored in the buffer + if(offset >= buf->used) { + return 0; + } + + if(buf->used - offset < len) { + len = buf->used - offset; + } + + uint32_t realoffset = buf->offset + offset; + + if(buf->size - buf->offset < offset) { + // The offset wrapped + realoffset -= buf->size; + } + + if(buf->size - realoffset < len) { + // The data is wrapped + ssize_t rx1 = cb(arg, buf->data + realoffset, buf->size - realoffset); + + if(rx1 < buf->size - realoffset) { + return rx1; + } + + ssize_t rx2 = cb(arg, buf->data, len - (buf->size - realoffset)); + + if(rx2 < 0) { + return rx2; + } else { + return rx1 + rx2; + } + } else { + return cb(arg, buf->data + realoffset, len); + } +} + +// Discard data from the buffer. +static ssize_t buffer_discard(struct buffer *buf, size_t len) { + if(buf->used < len) { + len = buf->used; + } + + if(buf->size - buf->offset < len) { + buf->offset -= buf->size; + } + + if(buf->used == len) { + buf->offset = 0; + } else { + buf->offset += len; + } + + buf->used -= len; + + return len; +} + +static void buffer_clear(struct buffer *buf) { + buf->used = 0; + buf->offset = 0; +} + +static bool buffer_set_size(struct buffer *buf, uint32_t minsize, uint32_t maxsize) { + if(maxsize < minsize) { + maxsize = minsize; + } + + buf->maxsize = maxsize; + + return buf->size >= minsize || buffer_resize(buf, minsize); +} + +static void buffer_exit(struct buffer *buf) { + free(buf->data); + memset(buf, 0, sizeof(*buf)); +} + +static uint32_t buffer_free(const struct buffer *buf) { + return buf->maxsize > buf->used ? buf->maxsize - buf->used : 0; +} + +// Connections are stored in a sorted list. +// This gives O(log(N)) lookup time, O(N log(N)) insertion time and O(N) deletion time. + +static int compare(const void *va, const void *vb) { + assert(va && vb); + + const struct utcp_connection *a = *(struct utcp_connection **)va; + const struct utcp_connection *b = *(struct utcp_connection **)vb; + + assert(a && b); + assert(a->src && b->src); + + int c = (int)a->src - (int)b->src; + + if(c) { + return c; + } + + c = (int)a->dst - (int)b->dst; + return c; +} + +static struct utcp_connection *find_connection(const struct utcp *utcp, uint16_t src, uint16_t dst) { + if(!utcp->nconnections) { + return NULL; + } + + struct utcp_connection key = { + .src = src, + .dst = dst, + }, *keyp = &key; + struct utcp_connection **match = bsearch(&keyp, utcp->connections, utcp->nconnections, sizeof(*utcp->connections), compare); + return match ? *match : NULL; +} + +static void free_connection(struct utcp_connection *c) { + struct utcp *utcp = c->utcp; + struct utcp_connection **cp = bsearch(&c, utcp->connections, utcp->nconnections, sizeof(*utcp->connections), compare); + + assert(cp); + + int i = cp - utcp->connections; + memmove(cp, cp + 1, (utcp->nconnections - i - 1) * sizeof(*cp)); + utcp->nconnections--; + + buffer_exit(&c->rcvbuf); + buffer_exit(&c->sndbuf); + free(c); +} + +static struct utcp_connection *allocate_connection(struct utcp *utcp, uint16_t src, uint16_t dst) { + // Check whether this combination of src and dst is free + + if(src) { + if(find_connection(utcp, src, dst)) { + errno = EADDRINUSE; + return NULL; + } + } else { // If src == 0, generate a random port number with the high bit set + if(utcp->nconnections >= 32767) { + errno = ENOMEM; + return NULL; + } + + src = rand() | 0x8000; + + while(find_connection(utcp, src, dst)) { + src++; + } + } + + // Allocate memory for the new connection + + if(utcp->nconnections >= utcp->nallocated) { + if(!utcp->nallocated) { + utcp->nallocated = 4; + } else { + utcp->nallocated *= 2; + } + + struct utcp_connection **new_array = realloc(utcp->connections, utcp->nallocated * sizeof(*utcp->connections)); + + if(!new_array) { + return NULL; + } + + utcp->connections = new_array; + } + + struct utcp_connection *c = calloc(1, sizeof(*c)); + + if(!c) { + return NULL; + } + + if(!buffer_set_size(&c->sndbuf, DEFAULT_SNDBUFSIZE, DEFAULT_MAXSNDBUFSIZE)) { + free(c); + return NULL; + } + + if(!buffer_set_size(&c->rcvbuf, DEFAULT_RCVBUFSIZE, DEFAULT_MAXRCVBUFSIZE)) { + buffer_exit(&c->sndbuf); + free(c); + return NULL; + } + + // Fill in the details + + c->src = src; + c->dst = dst; +#ifdef UTCP_DEBUG + c->snd.iss = 0; +#else + c->snd.iss = rand(); +#endif + c->snd.una = c->snd.iss; + c->snd.nxt = c->snd.iss + 1; + c->snd.last = c->snd.nxt; + c->snd.cwnd = (utcp->mss > 2190 ? 2 : utcp->mss > 1095 ? 3 : 4) * utcp->mss; + c->snd.ssthresh = ~0; + debug_cwnd(c); + c->srtt = 0; + c->rttvar = 0; + c->rto = START_RTO; + c->utcp = utcp; + + // Add it to the sorted list of connections + + utcp->connections[utcp->nconnections++] = c; + qsort(utcp->connections, utcp->nconnections, sizeof(*utcp->connections), compare); + + return c; +} + +static inline uint32_t absdiff(uint32_t a, uint32_t b) { + if(a > b) { + return a - b; + } else { + return b - a; + } +} + +// Update RTT variables. See RFC 6298. +static void update_rtt(struct utcp_connection *c, uint32_t rtt) { + if(!rtt) { + debug(c, "invalid rtt\n"); + return; + } + + if(!c->srtt) { + c->srtt = rtt; + c->rttvar = rtt / 2; + } else { + c->rttvar = (c->rttvar * 3 + absdiff(c->srtt, rtt)) / 4; + c->srtt = (c->srtt * 7 + rtt) / 8; + } + + c->rto = c->srtt + max(4 * c->rttvar, CLOCK_GRANULARITY); + + if(c->rto > MAX_RTO) { + c->rto = MAX_RTO; + } + + debug(c, "rtt %u srtt %u rttvar %u rto %u\n", rtt, c->srtt, c->rttvar, c->rto); +} + +static void start_retransmit_timer(struct utcp_connection *c) { + clock_gettime(UTCP_CLOCK, &c->rtrx_timeout); + + uint32_t rto = c->rto; + + while(rto > USEC_PER_SEC) { + c->rtrx_timeout.tv_sec++; + rto -= USEC_PER_SEC; + } + + c->rtrx_timeout.tv_nsec += rto * 1000; + + if(c->rtrx_timeout.tv_nsec >= NSEC_PER_SEC) { + c->rtrx_timeout.tv_nsec -= NSEC_PER_SEC; + c->rtrx_timeout.tv_sec++; + } + + debug(c, "rtrx_timeout %ld.%06lu\n", c->rtrx_timeout.tv_sec, c->rtrx_timeout.tv_nsec); +} + +static void stop_retransmit_timer(struct utcp_connection *c) { + timespec_clear(&c->rtrx_timeout); + debug(c, "rtrx_timeout cleared\n"); +} + +struct utcp_connection *utcp_connect_ex(struct utcp *utcp, uint16_t dst, utcp_recv_t recv, void *priv, uint32_t flags) { + struct utcp_connection *c = allocate_connection(utcp, 0, dst); + + if(!c) { + return NULL; + } + + assert((flags & ~0x1f) == 0); + + c->flags = flags; + c->recv = recv; + c->priv = priv; + + struct { + struct hdr hdr; + uint8_t init[4]; + } pkt; + + pkt.hdr.src = c->src; + pkt.hdr.dst = c->dst; + pkt.hdr.seq = c->snd.iss; + pkt.hdr.ack = 0; + pkt.hdr.wnd = c->rcvbuf.maxsize; + pkt.hdr.ctl = SYN; + pkt.hdr.aux = 0x0101; + pkt.init[0] = 1; + pkt.init[1] = 0; + pkt.init[2] = 0; + pkt.init[3] = flags & 0x7; + + set_state(c, SYN_SENT); + + print_packet(c, "send", &pkt, sizeof(pkt)); + utcp->send(utcp, &pkt, sizeof(pkt)); + + clock_gettime(UTCP_CLOCK, &c->conn_timeout); + c->conn_timeout.tv_sec += utcp->timeout; + + start_retransmit_timer(c); + + return c; +} + +struct utcp_connection *utcp_connect(struct utcp *utcp, uint16_t dst, utcp_recv_t recv, void *priv) { + return utcp_connect_ex(utcp, dst, recv, priv, UTCP_TCP); +} + +void utcp_accept(struct utcp_connection *c, utcp_recv_t recv, void *priv) { + if(c->reapable || c->state != SYN_RECEIVED) { + debug(c, "accept() called on invalid connection in state %s\n", c, strstate[c->state]); + return; + } + + debug(c, "accepted %p %p\n", c, recv, priv); + c->recv = recv; + c->priv = priv; + set_state(c, ESTABLISHED); +} + +static void ack(struct utcp_connection *c, bool sendatleastone) { + int32_t left = seqdiff(c->snd.last, c->snd.nxt); + int32_t cwndleft = is_reliable(c) ? min(c->snd.cwnd, c->snd.wnd) - seqdiff(c->snd.nxt, c->snd.una) : MAX_UNRELIABLE_SIZE; + + assert(left >= 0); + + if(cwndleft <= 0) { + left = 0; + } else if(cwndleft < left) { + left = cwndleft; + + if(!sendatleastone || cwndleft > c->utcp->mss) { + left -= left % c->utcp->mss; + } + } + + debug(c, "cwndleft %d left %d\n", cwndleft, left); + + if(!left && !sendatleastone) { + return; + } + + struct { + struct hdr hdr; + uint8_t data[]; + } *pkt = c->utcp->pkt; + + pkt->hdr.src = c->src; + pkt->hdr.dst = c->dst; + pkt->hdr.ack = c->rcv.nxt; + pkt->hdr.wnd = is_reliable(c) ? c->rcvbuf.maxsize : 0; + pkt->hdr.ctl = ACK; + pkt->hdr.aux = 0; + + do { + uint32_t seglen = left > c->utcp->mss ? c->utcp->mss : left; + pkt->hdr.seq = c->snd.nxt; + + buffer_copy(&c->sndbuf, pkt->data, seqdiff(c->snd.nxt, c->snd.una), seglen); + + c->snd.nxt += seglen; + left -= seglen; + + if(!is_reliable(c)) { + if(left) { + pkt->hdr.ctl |= MF; + } else { + pkt->hdr.ctl &= ~MF; + } + } + + if(seglen && fin_wanted(c, c->snd.nxt)) { + seglen--; + pkt->hdr.ctl |= FIN; + } + + if(!c->rtt_start.tv_sec) { + // Start RTT measurement + clock_gettime(UTCP_CLOCK, &c->rtt_start); + c->rtt_seq = pkt->hdr.seq + seglen; + debug(c, "starting RTT measurement, expecting ack %u\n", c->rtt_seq); + } + + print_packet(c, "send", pkt, sizeof(pkt->hdr) + seglen); + c->utcp->send(c->utcp, pkt, sizeof(pkt->hdr) + seglen); + + if(left && !is_reliable(c)) { + pkt->hdr.wnd += seglen; + } + } while(left); +} + +ssize_t utcp_send(struct utcp_connection *c, const void *data, size_t len) { + if(c->reapable) { + debug(c, "send() called on closed connection\n"); + errno = EBADF; + return -1; + } + + switch(c->state) { + case CLOSED: + case LISTEN: + debug(c, "send() called on unconnected connection\n"); + errno = ENOTCONN; + return -1; + + case SYN_SENT: + case SYN_RECEIVED: + case ESTABLISHED: + case CLOSE_WAIT: + break; + + case FIN_WAIT_1: + case FIN_WAIT_2: + case CLOSING: + case LAST_ACK: + case TIME_WAIT: + debug(c, "send() called on closed connection\n"); + errno = EPIPE; + return -1; + } + + // Exit early if we have nothing to send. + + if(!len) { + return 0; + } + + if(!data) { + errno = EFAULT; + return -1; + } + + // Check if we need to be able to buffer all data + + if(c->flags & UTCP_NO_PARTIAL) { + if(len > buffer_free(&c->sndbuf)) { + if(len > c->sndbuf.maxsize) { + errno = EMSGSIZE; + return -1; + } else { + errno = EWOULDBLOCK; + return 0; + } + } + } + + // Add data to send buffer. + + if(is_reliable(c)) { + len = buffer_put(&c->sndbuf, data, len); + } else if(c->state != SYN_SENT && c->state != SYN_RECEIVED) { + if(len > MAX_UNRELIABLE_SIZE || buffer_put(&c->sndbuf, data, len) != (ssize_t)len) { + errno = EMSGSIZE; + return -1; + } + } else { + return 0; + } + + if(len <= 0) { + if(is_reliable(c)) { + errno = EWOULDBLOCK; + return 0; + } else { + return len; + } + } + + c->snd.last += len; + + // Don't send anything yet if the connection has not fully established yet + + if(c->state == SYN_SENT || c->state == SYN_RECEIVED) { + return len; + } + + ack(c, false); + + if(!is_reliable(c)) { + c->snd.una = c->snd.nxt = c->snd.last; + buffer_discard(&c->sndbuf, c->sndbuf.used); + } + + if(is_reliable(c) && !timespec_isset(&c->rtrx_timeout)) { + start_retransmit_timer(c); + } + + if(is_reliable(c) && !timespec_isset(&c->conn_timeout)) { + clock_gettime(UTCP_CLOCK, &c->conn_timeout); + c->conn_timeout.tv_sec += c->utcp->timeout; + } + + return len; +} + +static void swap_ports(struct hdr *hdr) { + uint16_t tmp = hdr->src; + hdr->src = hdr->dst; + hdr->dst = tmp; +} + +static void fast_retransmit(struct utcp_connection *c) { + if(c->state == CLOSED || c->snd.last == c->snd.una) { + debug(c, "fast_retransmit() called but nothing to retransmit!\n"); + return; + } + + struct utcp *utcp = c->utcp; + + struct { + struct hdr hdr; + uint8_t data[]; + } *pkt = c->utcp->pkt; + + pkt->hdr.src = c->src; + pkt->hdr.dst = c->dst; + pkt->hdr.wnd = c->rcvbuf.maxsize; + pkt->hdr.aux = 0; + + switch(c->state) { + case ESTABLISHED: + case FIN_WAIT_1: + case CLOSE_WAIT: + case CLOSING: + case LAST_ACK: + // Send unacked data again. + pkt->hdr.seq = c->snd.una; + pkt->hdr.ack = c->rcv.nxt; + pkt->hdr.ctl = ACK; + uint32_t len = min(seqdiff(c->snd.last, c->snd.una), utcp->mss); + + if(fin_wanted(c, c->snd.una + len)) { + len--; + pkt->hdr.ctl |= FIN; + } + + buffer_copy(&c->sndbuf, pkt->data, 0, len); + print_packet(c, "rtrx", pkt, sizeof(pkt->hdr) + len); + utcp->send(utcp, pkt, sizeof(pkt->hdr) + len); + break; + + default: + break; + } +} + +static void retransmit(struct utcp_connection *c) { + if(c->state == CLOSED || c->snd.last == c->snd.una) { + debug(c, "retransmit() called but nothing to retransmit!\n"); + stop_retransmit_timer(c); + return; + } + + struct utcp *utcp = c->utcp; + + if(utcp->retransmit) { + utcp->retransmit(c); + } + + struct { + struct hdr hdr; + uint8_t data[]; + } *pkt = c->utcp->pkt; + + pkt->hdr.src = c->src; + pkt->hdr.dst = c->dst; + pkt->hdr.wnd = c->rcvbuf.maxsize; + pkt->hdr.aux = 0; + + switch(c->state) { + case SYN_SENT: + // Send our SYN again + pkt->hdr.seq = c->snd.iss; + pkt->hdr.ack = 0; + pkt->hdr.ctl = SYN; + pkt->hdr.aux = 0x0101; + pkt->data[0] = 1; + pkt->data[1] = 0; + pkt->data[2] = 0; + pkt->data[3] = c->flags & 0x7; + print_packet(c, "rtrx", pkt, sizeof(pkt->hdr) + 4); + utcp->send(utcp, pkt, sizeof(pkt->hdr) + 4); + break; + + case SYN_RECEIVED: + // Send SYNACK again + pkt->hdr.seq = c->snd.nxt; + pkt->hdr.ack = c->rcv.nxt; + pkt->hdr.ctl = SYN | ACK; + print_packet(c, "rtrx", pkt, sizeof(pkt->hdr)); + utcp->send(utcp, pkt, sizeof(pkt->hdr)); + break; + + case ESTABLISHED: + case FIN_WAIT_1: + case CLOSE_WAIT: + case CLOSING: + case LAST_ACK: + // Send unacked data again. + pkt->hdr.seq = c->snd.una; + pkt->hdr.ack = c->rcv.nxt; + pkt->hdr.ctl = ACK; + uint32_t len = min(seqdiff(c->snd.last, c->snd.una), utcp->mss); + + if(fin_wanted(c, c->snd.una + len)) { + len--; + pkt->hdr.ctl |= FIN; + } + + // RFC 5681 slow start after timeout + uint32_t flightsize = seqdiff(c->snd.nxt, c->snd.una); + c->snd.ssthresh = max(flightsize / 2, utcp->mss * 2); // eq. 4 + c->snd.cwnd = utcp->mss; + debug_cwnd(c); + + buffer_copy(&c->sndbuf, pkt->data, 0, len); + print_packet(c, "rtrx", pkt, sizeof(pkt->hdr) + len); + utcp->send(utcp, pkt, sizeof(pkt->hdr) + len); + + c->snd.nxt = c->snd.una + len; + break; + + case CLOSED: + case LISTEN: + case TIME_WAIT: + case FIN_WAIT_2: + // We shouldn't need to retransmit anything in this state. +#ifdef UTCP_DEBUG + abort(); +#endif + stop_retransmit_timer(c); + goto cleanup; + } + + start_retransmit_timer(c); + c->rto *= 2; + + if(c->rto > MAX_RTO) { + c->rto = MAX_RTO; + } + + c->rtt_start.tv_sec = 0; // invalidate RTT timer + c->dupack = 0; // cancel any ongoing fast recovery + +cleanup: + return; +} + +/* Update receive buffer and SACK entries after consuming data. + * + * Situation: + * + * |.....0000..1111111111.....22222......3333| + * |---------------^ + * + * 0..3 represent the SACK entries. The ^ indicates up to which point we want + * to remove data from the receive buffer. The idea is to substract "len" + * from the offset of all the SACK entries, and then remove/cut down entries + * that are shifted to before the start of the receive buffer. + * + * There are three cases: + * - the SACK entry is after ^, in that case just change the offset. + * - the SACK entry starts before and ends after ^, so we have to + * change both its offset and size. + * - the SACK entry is completely before ^, in that case delete it. + */ +static void sack_consume(struct utcp_connection *c, size_t len) { + debug(c, "sack_consume %lu\n", (unsigned long)len); + + if(len > c->rcvbuf.used) { + debug(c, "all SACK entries consumed\n"); + c->sacks[0].len = 0; + return; + } + + buffer_discard(&c->rcvbuf, len); + + for(int i = 0; i < NSACKS && c->sacks[i].len;) { + if(len < c->sacks[i].offset) { + c->sacks[i].offset -= len; + i++; + } else if(len < c->sacks[i].offset + c->sacks[i].len) { + c->sacks[i].len -= len - c->sacks[i].offset; + c->sacks[i].offset = 0; + i++; + } else { + if(i < NSACKS - 1) { + memmove(&c->sacks[i], &c->sacks[i + 1], (NSACKS - 1 - i) * sizeof(c->sacks)[i]); + c->sacks[NSACKS - 1].len = 0; + } else { + c->sacks[i].len = 0; + break; + } + } + } + + for(int i = 0; i < NSACKS && c->sacks[i].len; i++) { + debug(c, "SACK[%d] offset %u len %u\n", i, c->sacks[i].offset, c->sacks[i].len); + } +} + +static void handle_out_of_order(struct utcp_connection *c, uint32_t offset, const void *data, size_t len) { + debug(c, "out of order packet, offset %u\n", offset); + // Packet loss or reordering occured. Store the data in the buffer. + ssize_t rxd = buffer_put_at(&c->rcvbuf, offset, data, len); + + if(rxd <= 0) { + debug(c, "packet outside receive buffer, dropping\n"); + return; + } + + if((size_t)rxd < len) { + debug(c, "packet partially outside receive buffer\n"); + len = rxd; + } + + // Make note of where we put it. + for(int i = 0; i < NSACKS; i++) { + if(!c->sacks[i].len) { // nothing to merge, add new entry + debug(c, "new SACK entry %d\n", i); + c->sacks[i].offset = offset; + c->sacks[i].len = rxd; + break; + } else if(offset < c->sacks[i].offset) { + if(offset + rxd < c->sacks[i].offset) { // insert before + if(!c->sacks[NSACKS - 1].len) { // only if room left + debug(c, "insert SACK entry at %d\n", i); + memmove(&c->sacks[i + 1], &c->sacks[i], (NSACKS - i - 1) * sizeof(c->sacks)[i]); + c->sacks[i].offset = offset; + c->sacks[i].len = rxd; + } else { + debug(c, "SACK entries full, dropping packet\n"); + } + + break; + } else { // merge + debug(c, "merge with start of SACK entry at %d\n", i); + c->sacks[i].offset = offset; + break; + } + } else if(offset <= c->sacks[i].offset + c->sacks[i].len) { + if(offset + rxd > c->sacks[i].offset + c->sacks[i].len) { // merge + debug(c, "merge with end of SACK entry at %d\n", i); + c->sacks[i].len = offset + rxd - c->sacks[i].offset; + // TODO: handle potential merge with next entry + } + + break; + } + } + + for(int i = 0; i < NSACKS && c->sacks[i].len; i++) { + debug(c, "SACK[%d] offset %u len %u\n", i, c->sacks[i].offset, c->sacks[i].len); + } +} + +static void handle_in_order(struct utcp_connection *c, const void *data, size_t len) { + if(c->recv) { + ssize_t rxd = c->recv(c, data, len); + + if(rxd != (ssize_t)len) { + // TODO: handle the application not accepting all data. + abort(); + } + } + + // Check if we can process out-of-order data now. + if(c->sacks[0].len && len >= c->sacks[0].offset) { + debug(c, "incoming packet len %lu connected with SACK at %u\n", (unsigned long)len, c->sacks[0].offset); + + if(len < c->sacks[0].offset + c->sacks[0].len) { + size_t offset = len; + len = c->sacks[0].offset + c->sacks[0].len; + size_t remainder = len - offset; + + if(c->recv) { + ssize_t rxd = buffer_call(&c->rcvbuf, c->recv, c, offset, remainder); + + if(rxd != (ssize_t)remainder) { + // TODO: handle the application not accepting all data. + abort(); + } + } + } + } + + if(c->rcvbuf.used) { + sack_consume(c, len); + } + + c->rcv.nxt += len; +} + +static void handle_unreliable(struct utcp_connection *c, const struct hdr *hdr, const void *data, size_t len) { + // Fast path for unfragmented packets + if(!hdr->wnd && !(hdr->ctl & MF)) { + if(c->recv) { + c->recv(c, data, len); + } + + c->rcv.nxt = hdr->seq + len; + return; + } + + // Ensure reassembled packet are not larger than 64 kiB + if(hdr->wnd >= MAX_UNRELIABLE_SIZE || hdr->wnd + len > MAX_UNRELIABLE_SIZE) { + return; + } + + // Don't accept out of order fragments + if(hdr->wnd && hdr->seq != c->rcv.nxt) { + return; + } + + // Reset the receive buffer for the first fragment + if(!hdr->wnd) { + buffer_clear(&c->rcvbuf); + } + + ssize_t rxd = buffer_put_at(&c->rcvbuf, hdr->wnd, data, len); + + if(rxd != (ssize_t)len) { + return; + } + + // Send the packet if it's the final fragment + if(!(hdr->ctl & MF) && c->recv) { + buffer_call(&c->rcvbuf, c->recv, c, 0, hdr->wnd + len); + } + + c->rcv.nxt = hdr->seq + len; +} + +static void handle_incoming_data(struct utcp_connection *c, const struct hdr *hdr, const void *data, size_t len) { + if(!is_reliable(c)) { + handle_unreliable(c, hdr, data, len); + return; + } + + uint32_t offset = seqdiff(hdr->seq, c->rcv.nxt); + + if(offset) { + handle_out_of_order(c, offset, data, len); + } else { + handle_in_order(c, data, len); + } +} + + +ssize_t utcp_recv(struct utcp *utcp, const void *data, size_t len) { + const uint8_t *ptr = data; + + if(!utcp) { + errno = EFAULT; + return -1; + } + + if(!len) { + return 0; + } + + if(!data) { + errno = EFAULT; + return -1; + } + + // Drop packets smaller than the header + + struct hdr hdr; + + if(len < sizeof(hdr)) { + print_packet(NULL, "recv", data, len); + errno = EBADMSG; + return -1; + } + + // Make a copy from the potentially unaligned data to a struct hdr + + memcpy(&hdr, ptr, sizeof(hdr)); + + // Try to match the packet to an existing connection + + struct utcp_connection *c = find_connection(utcp, hdr.dst, hdr.src); + print_packet(c, "recv", data, len); + + // Process the header + + ptr += sizeof(hdr); + len -= sizeof(hdr); + + // Drop packets with an unknown CTL flag + + if(hdr.ctl & ~(SYN | ACK | RST | FIN | MF)) { + print_packet(NULL, "recv", data, len); + errno = EBADMSG; + return -1; + } + + // Check for auxiliary headers + + const uint8_t *init = NULL; + + uint16_t aux = hdr.aux; + + while(aux) { + size_t auxlen = 4 * (aux >> 8) & 0xf; + uint8_t auxtype = aux & 0xff; + + if(len < auxlen) { + errno = EBADMSG; + return -1; + } + + switch(auxtype) { + case AUX_INIT: + if(!(hdr.ctl & SYN) || auxlen != 4) { + errno = EBADMSG; + return -1; + } + + init = ptr; + break; + + default: + errno = EBADMSG; + return -1; + } + + len -= auxlen; + ptr += auxlen; + + if(!(aux & 0x800)) { + break; + } + + if(len < 2) { + errno = EBADMSG; + return -1; + } + + memcpy(&aux, ptr, 2); + len -= 2; + ptr += 2; + } + + bool has_data = len || (hdr.ctl & (SYN | FIN)); + + // Is it for a new connection? + + if(!c) { + // Ignore RST packets + + if(hdr.ctl & RST) { + return 0; + } + + // Is it a SYN packet and are we LISTENing? + + if(hdr.ctl & SYN && !(hdr.ctl & ACK) && utcp->accept) { + // If we don't want to accept it, send a RST back + if((utcp->pre_accept && !utcp->pre_accept(utcp, hdr.dst))) { + len = 1; + goto reset; + } + + // Try to allocate memory, otherwise send a RST back + c = allocate_connection(utcp, hdr.dst, hdr.src); + + if(!c) { + len = 1; + goto reset; + } + + // Parse auxilliary information + if(init) { + if(init[0] < 1) { + len = 1; + goto reset; + } + + c->flags = init[3] & 0x7; + } else { + c->flags = UTCP_TCP; + } + +synack: + // Return SYN+ACK, go to SYN_RECEIVED state + c->snd.wnd = hdr.wnd; + c->rcv.irs = hdr.seq; + c->rcv.nxt = c->rcv.irs + 1; + set_state(c, SYN_RECEIVED); + + struct { + struct hdr hdr; + uint8_t data[4]; + } pkt; + + pkt.hdr.src = c->src; + pkt.hdr.dst = c->dst; + pkt.hdr.ack = c->rcv.irs + 1; + pkt.hdr.seq = c->snd.iss; + pkt.hdr.wnd = c->rcvbuf.maxsize; + pkt.hdr.ctl = SYN | ACK; + + if(init) { + pkt.hdr.aux = 0x0101; + pkt.data[0] = 1; + pkt.data[1] = 0; + pkt.data[2] = 0; + pkt.data[3] = c->flags & 0x7; + print_packet(c, "send", &pkt, sizeof(hdr) + 4); + utcp->send(utcp, &pkt, sizeof(hdr) + 4); + } else { + pkt.hdr.aux = 0; + print_packet(c, "send", &pkt, sizeof(hdr)); + utcp->send(utcp, &pkt, sizeof(hdr)); + } + + start_retransmit_timer(c); + } else { + // No, we don't want your packets, send a RST back + len = 1; + goto reset; + } + + return 0; + } + + debug(c, "state %s\n", strstate[c->state]); + + // In case this is for a CLOSED connection, ignore the packet. + // TODO: make it so incoming packets can never match a CLOSED connection. + + if(c->state == CLOSED) { + debug(c, "got packet for closed connection\n"); + return 0; + } + + // It is for an existing connection. + + // 1. Drop invalid packets. + + // 1a. Drop packets that should not happen in our current state. + + switch(c->state) { + case SYN_SENT: + case SYN_RECEIVED: + case ESTABLISHED: + case FIN_WAIT_1: + case FIN_WAIT_2: + case CLOSE_WAIT: + case CLOSING: + case LAST_ACK: + case TIME_WAIT: + break; + + default: +#ifdef UTCP_DEBUG + abort(); +#endif + break; + } + + // 1b. Discard data that is not in our receive window. + + if(is_reliable(c)) { + bool acceptable; + + if(c->state == SYN_SENT) { + acceptable = true; + } else if(len == 0) { + acceptable = seqdiff(hdr.seq, c->rcv.nxt) >= 0; + } else { + int32_t rcv_offset = seqdiff(hdr.seq, c->rcv.nxt); + + // cut already accepted front overlapping + if(rcv_offset < 0) { + acceptable = len > (size_t) - rcv_offset; + + if(acceptable) { + ptr -= rcv_offset; + len += rcv_offset; + hdr.seq -= rcv_offset; + } + } else { + acceptable = seqdiff(hdr.seq, c->rcv.nxt) >= 0 && seqdiff(hdr.seq, c->rcv.nxt) + len <= c->rcvbuf.maxsize; + } + } + + if(!acceptable) { + debug(c, "packet not acceptable, %u <= %u + %lu < %u\n", c->rcv.nxt, hdr.seq, (unsigned long)len, c->rcv.nxt + c->rcvbuf.maxsize); + + // Ignore unacceptable RST packets. + if(hdr.ctl & RST) { + return 0; + } + + // Otherwise, continue processing. + len = 0; + } + } else { +#if UTCP_DEBUG + int32_t rcv_offset = seqdiff(hdr.seq, c->rcv.nxt); + + if(rcv_offset) { + debug(c, "packet out of order, offset %u bytes", rcv_offset); + } + +#endif + } + + c->snd.wnd = hdr.wnd; // TODO: move below + + // 1c. Drop packets with an invalid ACK. + // ackno should not roll back, and it should also not be bigger than what we ever could have sent + // (= snd.una + c->sndbuf.used). + + if(!is_reliable(c)) { + if(hdr.ack != c->snd.last && c->state >= ESTABLISHED) { + hdr.ack = c->snd.una; + } + } + + if(hdr.ctl & ACK && (seqdiff(hdr.ack, c->snd.last) > 0 || seqdiff(hdr.ack, c->snd.una) < 0)) { + debug(c, "packet ack seqno out of range, %u <= %u < %u\n", c->snd.una, hdr.ack, c->snd.una + c->sndbuf.used); + + // Ignore unacceptable RST packets. + if(hdr.ctl & RST) { + return 0; + } + + goto reset; + } + + // 2. Handle RST packets + + if(hdr.ctl & RST) { + switch(c->state) { + case SYN_SENT: + if(!(hdr.ctl & ACK)) { + return 0; + } + + // The peer has refused our connection. + set_state(c, CLOSED); + errno = ECONNREFUSED; + + if(c->recv) { + c->recv(c, NULL, 0); + } + + if(c->poll && !c->reapable) { + c->poll(c, 0); + } + + return 0; + + case SYN_RECEIVED: + if(hdr.ctl & ACK) { + return 0; + } + + // We haven't told the application about this connection yet. Silently delete. + free_connection(c); + return 0; + + case ESTABLISHED: + case FIN_WAIT_1: + case FIN_WAIT_2: + case CLOSE_WAIT: + if(hdr.ctl & ACK) { + return 0; + } + + // The peer has aborted our connection. + set_state(c, CLOSED); + errno = ECONNRESET; + + if(c->recv) { + c->recv(c, NULL, 0); + } + + if(c->poll && !c->reapable) { + c->poll(c, 0); + } + + return 0; + + case CLOSING: + case LAST_ACK: + case TIME_WAIT: + if(hdr.ctl & ACK) { + return 0; + } + + // As far as the application is concerned, the connection has already been closed. + // If it has called utcp_close() already, we can immediately free this connection. + if(c->reapable) { + free_connection(c); + return 0; + } + + // Otherwise, immediately move to the CLOSED state. + set_state(c, CLOSED); + return 0; + + default: +#ifdef UTCP_DEBUG + abort(); +#endif + break; + } + } + + uint32_t advanced; + + if(!(hdr.ctl & ACK)) { + advanced = 0; + goto skip_ack; + } + + // 3. Advance snd.una + + advanced = seqdiff(hdr.ack, c->snd.una); + + if(advanced) { + // RTT measurement + if(c->rtt_start.tv_sec) { + if(c->rtt_seq == hdr.ack) { + struct timespec now; + clock_gettime(UTCP_CLOCK, &now); + int32_t diff = timespec_diff_usec(&now, &c->rtt_start); + update_rtt(c, diff); + c->rtt_start.tv_sec = 0; + } else if(c->rtt_seq < hdr.ack) { + debug(c, "cancelling RTT measurement: %u < %u\n", c->rtt_seq, hdr.ack); + c->rtt_start.tv_sec = 0; + } + } + + int32_t data_acked = advanced; + + switch(c->state) { + case SYN_SENT: + case SYN_RECEIVED: + data_acked--; + break; + + // TODO: handle FIN as well. + default: + break; + } + + assert(data_acked >= 0); + +#ifndef NDEBUG + int32_t bufused = seqdiff(c->snd.last, c->snd.una); + assert(data_acked <= bufused); +#endif + + if(data_acked) { + buffer_discard(&c->sndbuf, data_acked); + + if(is_reliable(c)) { + c->do_poll = true; + } + } + + // Also advance snd.nxt if possible + if(seqdiff(c->snd.nxt, hdr.ack) < 0) { + c->snd.nxt = hdr.ack; + } + + c->snd.una = hdr.ack; + + if(c->dupack) { + if(c->dupack >= 3) { + debug(c, "fast recovery ended\n"); + c->snd.cwnd = c->snd.ssthresh; + } + + c->dupack = 0; + } + + // Increase the congestion window according to RFC 5681 + if(c->snd.cwnd < c->snd.ssthresh) { + c->snd.cwnd += min(advanced, utcp->mss); // eq. 2 + } else { + c->snd.cwnd += max(1, (utcp->mss * utcp->mss) / c->snd.cwnd); // eq. 3 + } + + if(c->snd.cwnd > c->sndbuf.maxsize) { + c->snd.cwnd = c->sndbuf.maxsize; + } + + debug_cwnd(c); + + // Check if we have sent a FIN that is now ACKed. + switch(c->state) { + case FIN_WAIT_1: + if(c->snd.una == c->snd.last) { + set_state(c, FIN_WAIT_2); + } + + break; + + case CLOSING: + if(c->snd.una == c->snd.last) { + clock_gettime(UTCP_CLOCK, &c->conn_timeout); + c->conn_timeout.tv_sec += utcp->timeout; + set_state(c, TIME_WAIT); + } + + break; + + default: + break; + } + } else { + if(!len && is_reliable(c) && c->snd.una != c->snd.last) { + c->dupack++; + debug(c, "duplicate ACK %d\n", c->dupack); + + if(c->dupack == 3) { + // RFC 5681 fast recovery + debug(c, "fast recovery started\n", c->dupack); + uint32_t flightsize = seqdiff(c->snd.nxt, c->snd.una); + c->snd.ssthresh = max(flightsize / 2, utcp->mss * 2); // eq. 4 + c->snd.cwnd = min(c->snd.ssthresh + 3 * utcp->mss, c->sndbuf.maxsize); + + if(c->snd.cwnd > c->sndbuf.maxsize) { + c->snd.cwnd = c->sndbuf.maxsize; + } + + debug_cwnd(c); + + fast_retransmit(c); + } else if(c->dupack > 3) { + c->snd.cwnd += utcp->mss; + + if(c->snd.cwnd > c->sndbuf.maxsize) { + c->snd.cwnd = c->sndbuf.maxsize; + } + + debug_cwnd(c); + } + + // We got an ACK which indicates the other side did get one of our packets. + // Reset the retransmission timer to avoid going to slow start, + // but don't touch the connection timeout. + start_retransmit_timer(c); + } + } + + // 4. Update timers + + if(advanced) { + if(c->snd.una == c->snd.last) { + stop_retransmit_timer(c); + timespec_clear(&c->conn_timeout); + } else if(is_reliable(c)) { + start_retransmit_timer(c); + clock_gettime(UTCP_CLOCK, &c->conn_timeout); + c->conn_timeout.tv_sec += utcp->timeout; + } + } + +skip_ack: + // 5. Process SYN stuff + + if(hdr.ctl & SYN) { + switch(c->state) { + case SYN_SENT: + + // This is a SYNACK. It should always have ACKed the SYN. + if(!advanced) { + goto reset; + } + + c->rcv.irs = hdr.seq; + c->rcv.nxt = hdr.seq + 1; + + if(c->shut_wr) { + c->snd.last++; + set_state(c, FIN_WAIT_1); + } else { + set_state(c, ESTABLISHED); + } + + break; + + case SYN_RECEIVED: + // This is a retransmit of a SYN, send back the SYNACK. + goto synack; + + case ESTABLISHED: + case FIN_WAIT_1: + case FIN_WAIT_2: + case CLOSE_WAIT: + case CLOSING: + case LAST_ACK: + case TIME_WAIT: + // This could be a retransmission. Ignore the SYN flag, but send an ACK back. + break; + + default: +#ifdef UTCP_DEBUG + abort(); +#endif + return 0; + } + } + + // 6. Process new data + + if(c->state == SYN_RECEIVED) { + // This is the ACK after the SYNACK. It should always have ACKed the SYNACK. + if(!advanced) { + goto reset; + } + + // Are we still LISTENing? + if(utcp->accept) { + utcp->accept(c, c->src); + } + + if(c->state != ESTABLISHED) { + set_state(c, CLOSED); + c->reapable = true; + goto reset; + } + } + + if(len) { + switch(c->state) { + case SYN_SENT: + case SYN_RECEIVED: + // This should never happen. +#ifdef UTCP_DEBUG + abort(); +#endif + return 0; + + case ESTABLISHED: + case FIN_WAIT_1: + case FIN_WAIT_2: + break; + + case CLOSE_WAIT: + case CLOSING: + case LAST_ACK: + case TIME_WAIT: + // Ehm no, We should never receive more data after a FIN. + goto reset; + + default: +#ifdef UTCP_DEBUG + abort(); +#endif + return 0; + } + + handle_incoming_data(c, &hdr, ptr, len); + } + + // 7. Process FIN stuff + + if((hdr.ctl & FIN) && (!is_reliable(c) || hdr.seq + len == c->rcv.nxt)) { + switch(c->state) { + case SYN_SENT: + case SYN_RECEIVED: + // This should never happen. +#ifdef UTCP_DEBUG + abort(); +#endif + break; + + case ESTABLISHED: + set_state(c, CLOSE_WAIT); + break; + + case FIN_WAIT_1: + set_state(c, CLOSING); + break; + + case FIN_WAIT_2: + clock_gettime(UTCP_CLOCK, &c->conn_timeout); + c->conn_timeout.tv_sec += utcp->timeout; + set_state(c, TIME_WAIT); + break; + + case CLOSE_WAIT: + case CLOSING: + case LAST_ACK: + case TIME_WAIT: + // Ehm, no. We should never receive a second FIN. + goto reset; + + default: +#ifdef UTCP_DEBUG + abort(); +#endif + break; + } + + // FIN counts as one sequence number + c->rcv.nxt++; + len++; + + // Inform the application that the peer closed its end of the connection. + if(c->recv) { + errno = 0; + c->recv(c, NULL, 0); + } + } + + // Now we send something back if: + // - we received data, so we have to send back an ACK + // -> sendatleastone = true + // - or we got an ack, so we should maybe send a bit more data + // -> sendatleastone = false + + if(is_reliable(c) || hdr.ctl & SYN || hdr.ctl & FIN) { + ack(c, has_data); + } + + return 0; + +reset: + swap_ports(&hdr); + hdr.wnd = 0; + hdr.aux = 0; + + if(hdr.ctl & ACK) { + hdr.seq = hdr.ack; + hdr.ctl = RST; + } else { + hdr.ack = hdr.seq + len; + hdr.seq = 0; + hdr.ctl = RST | ACK; + } + + print_packet(c, "send", &hdr, sizeof(hdr)); + utcp->send(utcp, &hdr, sizeof(hdr)); + return 0; + +} + +int utcp_shutdown(struct utcp_connection *c, int dir) { + debug(c, "shutdown %d at %u\n", dir, c ? c->snd.last : 0); + + if(!c) { + errno = EFAULT; + return -1; + } + + if(c->reapable) { + debug(c, "shutdown() called on closed connection\n"); + errno = EBADF; + return -1; + } + + if(!(dir == UTCP_SHUT_RD || dir == UTCP_SHUT_WR || dir == UTCP_SHUT_RDWR)) { + errno = EINVAL; + return -1; + } + + // TCP does not have a provision for stopping incoming packets. + // The best we can do is to just ignore them. + if(dir == UTCP_SHUT_RD || dir == UTCP_SHUT_RDWR) { + c->recv = NULL; + } + + // The rest of the code deals with shutting down writes. + if(dir == UTCP_SHUT_RD) { + return 0; + } + + // Only process shutting down writes once. + if(c->shut_wr) { + return 0; + } + + c->shut_wr = true; + + switch(c->state) { + case CLOSED: + case LISTEN: + errno = ENOTCONN; + return -1; + + case SYN_SENT: + return 0; + + case SYN_RECEIVED: + case ESTABLISHED: + set_state(c, FIN_WAIT_1); + break; + + case FIN_WAIT_1: + case FIN_WAIT_2: + return 0; + + case CLOSE_WAIT: + set_state(c, CLOSING); + break; + + case CLOSING: + case LAST_ACK: + case TIME_WAIT: + return 0; + } + + c->snd.last++; + + ack(c, false); + + if(!timespec_isset(&c->rtrx_timeout)) { + start_retransmit_timer(c); + } + + return 0; +} + +static bool reset_connection(struct utcp_connection *c) { + if(!c) { + errno = EFAULT; + return false; + } + + if(c->reapable) { + debug(c, "abort() called on closed connection\n"); + errno = EBADF; + return false; + } + + c->recv = NULL; + c->poll = NULL; + + switch(c->state) { + case CLOSED: + return true; + + case LISTEN: + case SYN_SENT: + case CLOSING: + case LAST_ACK: + case TIME_WAIT: + set_state(c, CLOSED); + return true; + + case SYN_RECEIVED: + case ESTABLISHED: + case FIN_WAIT_1: + case FIN_WAIT_2: + case CLOSE_WAIT: + set_state(c, CLOSED); + break; + } + + // Send RST + + struct hdr hdr; + + hdr.src = c->src; + hdr.dst = c->dst; + hdr.seq = c->snd.nxt; + hdr.ack = 0; + hdr.wnd = 0; + hdr.ctl = RST; + + print_packet(c, "send", &hdr, sizeof(hdr)); + c->utcp->send(c->utcp, &hdr, sizeof(hdr)); + return true; +} + +// Closes all the opened connections +void utcp_abort_all_connections(struct utcp *utcp) { + if(!utcp) { + errno = EINVAL; + return; + } + + for(int i = 0; i < utcp->nconnections; i++) { + struct utcp_connection *c = utcp->connections[i]; + + if(c->reapable || c->state == CLOSED) { + continue; + } + + utcp_recv_t old_recv = c->recv; + utcp_poll_t old_poll = c->poll; + + reset_connection(c); + + if(old_recv) { + errno = 0; + old_recv(c, NULL, 0); + } + + if(old_poll && !c->reapable) { + errno = 0; + old_poll(c, 0); + } + } + + return; +} + +int utcp_close(struct utcp_connection *c) { + if(utcp_shutdown(c, SHUT_RDWR) && errno != ENOTCONN) { + return -1; + } + + c->recv = NULL; + c->poll = NULL; + c->reapable = true; + return 0; +} + +int utcp_abort(struct utcp_connection *c) { + if(!reset_connection(c)) { + return -1; + } + + c->reapable = true; + return 0; +} + +/* Handle timeouts. + * One call to this function will loop through all connections, + * checking if something needs to be resent or not. + * The return value is the time to the next timeout in milliseconds, + * or maybe a negative value if the timeout is infinite. + */ +struct timespec utcp_timeout(struct utcp *utcp) { + struct timespec now; + clock_gettime(UTCP_CLOCK, &now); + struct timespec next = {now.tv_sec + 3600, now.tv_nsec}; + + for(int i = 0; i < utcp->nconnections; i++) { + struct utcp_connection *c = utcp->connections[i]; + + if(!c) { + continue; + } + + // delete connections that have been utcp_close()d. + if(c->state == CLOSED) { + if(c->reapable) { + debug(c, "reaping\n"); + free_connection(c); + i--; + } + + continue; + } + + if(timespec_isset(&c->conn_timeout) && timespec_lt(&c->conn_timeout, &now)) { + errno = ETIMEDOUT; + c->state = CLOSED; + + if(c->recv) { + c->recv(c, NULL, 0); + } + + if(c->poll && !c->reapable) { + c->poll(c, 0); + } + + continue; + } + + if(timespec_isset(&c->rtrx_timeout) && timespec_lt(&c->rtrx_timeout, &now)) { + debug(c, "retransmitting after timeout\n"); + retransmit(c); + } + + if(c->poll) { + if((c->state == ESTABLISHED || c->state == CLOSE_WAIT) && c->do_poll) { + c->do_poll = false; + uint32_t len = buffer_free(&c->sndbuf); + + if(len) { + c->poll(c, len); + } + } else if(c->state == CLOSED) { + c->poll(c, 0); + } + } + + if(timespec_isset(&c->conn_timeout) && timespec_lt(&c->conn_timeout, &next)) { + next = c->conn_timeout; + } + + if(timespec_isset(&c->rtrx_timeout) && timespec_lt(&c->rtrx_timeout, &next)) { + next = c->rtrx_timeout; + } + } + + struct timespec diff; + + timespec_sub(&next, &now, &diff); + + return diff; +} + +bool utcp_is_active(struct utcp *utcp) { + if(!utcp) { + return false; + } + + for(int i = 0; i < utcp->nconnections; i++) + if(utcp->connections[i]->state != CLOSED && utcp->connections[i]->state != TIME_WAIT) { + return true; + } + + return false; +} + +struct utcp *utcp_init(utcp_accept_t accept, utcp_pre_accept_t pre_accept, utcp_send_t send, void *priv) { + if(!send) { + errno = EFAULT; + return NULL; + } + + struct utcp *utcp = calloc(1, sizeof(*utcp)); + + if(!utcp) { + return NULL; + } + + utcp_set_mtu(utcp, DEFAULT_MTU); + + if(!utcp->pkt) { + free(utcp); + return NULL; + } + + if(!CLOCK_GRANULARITY) { + struct timespec res; + clock_getres(UTCP_CLOCK, &res); + CLOCK_GRANULARITY = res.tv_sec * USEC_PER_SEC + res.tv_nsec / 1000; + } + + utcp->accept = accept; + utcp->pre_accept = pre_accept; + utcp->send = send; + utcp->priv = priv; + utcp->timeout = DEFAULT_USER_TIMEOUT; // sec + + return utcp; +} + +void utcp_exit(struct utcp *utcp) { + if(!utcp) { + return; + } + + for(int i = 0; i < utcp->nconnections; i++) { + struct utcp_connection *c = utcp->connections[i]; + + if(!c->reapable) { + if(c->recv) { + c->recv(c, NULL, 0); + } + + if(c->poll && !c->reapable) { + c->poll(c, 0); + } + } + + buffer_exit(&c->rcvbuf); + buffer_exit(&c->sndbuf); + free(c); + } + + free(utcp->connections); + free(utcp->pkt); + free(utcp); +} + +uint16_t utcp_get_mtu(struct utcp *utcp) { + return utcp ? utcp->mtu : 0; +} + +uint16_t utcp_get_mss(struct utcp *utcp) { + return utcp ? utcp->mss : 0; +} + +void utcp_set_mtu(struct utcp *utcp, uint16_t mtu) { + if(!utcp) { + return; + } + + if(mtu <= sizeof(struct hdr)) { + return; + } + + if(mtu > utcp->mtu) { + char *new = realloc(utcp->pkt, mtu + sizeof(struct hdr)); + + if(!new) { + return; + } + + utcp->pkt = new; + } + + utcp->mtu = mtu; + utcp->mss = mtu - sizeof(struct hdr); +} + +void utcp_reset_timers(struct utcp *utcp) { + if(!utcp) { + return; + } + + struct timespec now, then; + + clock_gettime(UTCP_CLOCK, &now); + + then = now; + + then.tv_sec += utcp->timeout; + + for(int i = 0; i < utcp->nconnections; i++) { + struct utcp_connection *c = utcp->connections[i]; + + if(c->reapable) { + continue; + } + + if(timespec_isset(&c->rtrx_timeout)) { + c->rtrx_timeout = now; + } + + if(timespec_isset(&c->conn_timeout)) { + c->conn_timeout = then; + } + + c->rtt_start.tv_sec = 0; + + if(c->rto > START_RTO) { + c->rto = START_RTO; + } + } +} + +int utcp_get_user_timeout(struct utcp *u) { + return u ? u->timeout : 0; +} + +void utcp_set_user_timeout(struct utcp *u, int timeout) { + if(u) { + u->timeout = timeout; + } +} + +size_t utcp_get_sndbuf(struct utcp_connection *c) { + return c ? c->sndbuf.maxsize : 0; +} + +size_t utcp_get_sndbuf_free(struct utcp_connection *c) { + if(!c) { + return 0; + } + + switch(c->state) { + case SYN_SENT: + case SYN_RECEIVED: + case ESTABLISHED: + case CLOSE_WAIT: + return buffer_free(&c->sndbuf); + + default: + return 0; + } +} + +void utcp_set_sndbuf(struct utcp_connection *c, size_t size) { + if(!c) { + return; + } + + c->sndbuf.maxsize = size; + + if(c->sndbuf.maxsize != size) { + c->sndbuf.maxsize = -1; + } + + c->do_poll = is_reliable(c) && buffer_free(&c->sndbuf); +} + +size_t utcp_get_rcvbuf(struct utcp_connection *c) { + return c ? c->rcvbuf.maxsize : 0; +} + +size_t utcp_get_rcvbuf_free(struct utcp_connection *c) { + if(c && (c->state == ESTABLISHED || c->state == CLOSE_WAIT)) { + return buffer_free(&c->rcvbuf); + } else { + return 0; + } +} + +void utcp_set_rcvbuf(struct utcp_connection *c, size_t size) { + if(!c) { + return; + } + + c->rcvbuf.maxsize = size; + + if(c->rcvbuf.maxsize != size) { + c->rcvbuf.maxsize = -1; + } +} + +size_t utcp_get_sendq(struct utcp_connection *c) { + return c->sndbuf.used; +} + +size_t utcp_get_recvq(struct utcp_connection *c) { + return c->rcvbuf.used; +} + +bool utcp_get_nodelay(struct utcp_connection *c) { + return c ? c->nodelay : false; +} + +void utcp_set_nodelay(struct utcp_connection *c, bool nodelay) { + if(c) { + c->nodelay = nodelay; + } +} + +bool utcp_get_keepalive(struct utcp_connection *c) { + return c ? c->keepalive : false; +} + +void utcp_set_keepalive(struct utcp_connection *c, bool keepalive) { + if(c) { + c->keepalive = keepalive; + } +} + +size_t utcp_get_outq(struct utcp_connection *c) { + return c ? seqdiff(c->snd.nxt, c->snd.una) : 0; +} + +void utcp_set_recv_cb(struct utcp_connection *c, utcp_recv_t recv) { + if(c) { + c->recv = recv; + } +} + +void utcp_set_poll_cb(struct utcp_connection *c, utcp_poll_t poll) { + if(c) { + c->poll = poll; + c->do_poll = is_reliable(c) && buffer_free(&c->sndbuf); + } +} + +void utcp_set_accept_cb(struct utcp *utcp, utcp_accept_t accept, utcp_pre_accept_t pre_accept) { + if(utcp) { + utcp->accept = accept; + utcp->pre_accept = pre_accept; + } +} + +void utcp_expect_data(struct utcp_connection *c, bool expect) { + if(!c || c->reapable) { + return; + } + + if(!(c->state == ESTABLISHED || c->state == FIN_WAIT_1 || c->state == FIN_WAIT_2)) { + return; + } + + if(expect) { + // If we expect data, start the connection timer. + if(!timespec_isset(&c->conn_timeout)) { + clock_gettime(UTCP_CLOCK, &c->conn_timeout); + c->conn_timeout.tv_sec += c->utcp->timeout; + } + } else { + // If we want to cancel expecting data, only clear the timer when there is no unACKed data. + if(c->snd.una == c->snd.last) { + timespec_clear(&c->conn_timeout); + } + } +} + +void utcp_offline(struct utcp *utcp, bool offline) { + struct timespec now; + clock_gettime(UTCP_CLOCK, &now); + + for(int i = 0; i < utcp->nconnections; i++) { + struct utcp_connection *c = utcp->connections[i]; + + if(c->reapable) { + continue; + } + + utcp_expect_data(c, offline); + + if(!offline) { + if(timespec_isset(&c->rtrx_timeout)) { + c->rtrx_timeout = now; + } + + utcp->connections[i]->rtt_start.tv_sec = 0; + + if(c->rto > START_RTO) { + c->rto = START_RTO; + } + } + } +} + +void utcp_set_retransmit_cb(struct utcp *utcp, utcp_retransmit_t retransmit) { + utcp->retransmit = retransmit; +} + +void utcp_set_clock_granularity(long granularity) { + CLOCK_GRANULARITY = granularity; +} diff --git a/src/utcp.h b/src/utcp.h new file mode 100644 index 00000000..ff509ac9 --- /dev/null +++ b/src/utcp.h @@ -0,0 +1,125 @@ +/* + utcp.h -- Userspace TCP + Copyright (C) 2014 Guus Sliepen + + This program is free software; you can redistribute it and/or modify + it under the terms of the GNU General Public License as published by + the Free Software Foundation; either version 2 of the License, or + (at your option) any later version. + + This program is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU General Public License for more details. + + You should have received a copy of the GNU General Public License along + with this program; if not, write to the Free Software Foundation, Inc., + 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA. +*/ + +#ifndef UTCP_H +#define UTCP_H + +#include +#include +#include +// TODO: Windows +#include + +#ifndef UTCP_INTERNAL +struct utcp { + void *priv; +}; + +struct utcp_connection { + void *priv; + struct utcp *const utcp; + const uint32_t flags; +}; +#else +struct utcp; +struct utcp_connection; +#endif + +#define UTCP_SHUT_RD 0 +#define UTCP_SHUT_WR 1 +#define UTCP_SHUT_RDWR 2 + +#define UTCP_ORDERED 1 +#define UTCP_RELIABLE 2 +#define UTCP_FRAMED 4 +#define UTCP_DROP_LATE 8 +#define UTCP_NO_PARTIAL 16 + +#define UTCP_TCP 3 +#define UTCP_UDP 0 + +typedef bool (*utcp_pre_accept_t)(struct utcp *utcp, uint16_t port); +typedef void (*utcp_accept_t)(struct utcp_connection *utcp_connection, uint16_t port); +typedef void (*utcp_retransmit_t)(struct utcp_connection *connection); + +typedef ssize_t (*utcp_send_t)(struct utcp *utcp, const void *data, size_t len); +typedef ssize_t (*utcp_recv_t)(struct utcp_connection *connection, const void *data, size_t len); + +typedef void (*utcp_poll_t)(struct utcp_connection *connection, size_t len); + +extern struct utcp *utcp_init(utcp_accept_t accept, utcp_pre_accept_t pre_accept, utcp_send_t send, void *priv); +extern void utcp_exit(struct utcp *utcp); + +extern struct utcp_connection *utcp_connect_ex(struct utcp *utcp, uint16_t port, utcp_recv_t recv, void *priv, uint32_t flags); +extern struct utcp_connection *utcp_connect(struct utcp *utcp, uint16_t port, utcp_recv_t recv, void *priv); +extern void utcp_accept(struct utcp_connection *utcp, utcp_recv_t recv, void *priv); +extern ssize_t utcp_send(struct utcp_connection *connection, const void *data, size_t len); +extern ssize_t utcp_recv(struct utcp *utcp, const void *data, size_t len); +extern int utcp_close(struct utcp_connection *connection); +extern int utcp_abort(struct utcp_connection *connection); +extern int utcp_shutdown(struct utcp_connection *connection, int how); +extern struct timespec utcp_timeout(struct utcp *utcp); +extern void utcp_set_recv_cb(struct utcp_connection *connection, utcp_recv_t recv); +extern void utcp_set_poll_cb(struct utcp_connection *connection, utcp_poll_t poll); +extern void utcp_set_accept_cb(struct utcp *utcp, utcp_accept_t accept, utcp_pre_accept_t pre_accept); +extern bool utcp_is_active(struct utcp *utcp); +extern void utcp_abort_all_connections(struct utcp *utcp); + +// Global socket options + +extern int utcp_get_user_timeout(struct utcp *utcp); +extern void utcp_set_user_timeout(struct utcp *utcp, int seconds); + +extern uint16_t utcp_get_mtu(struct utcp *utcp); +extern uint16_t utcp_get_mss(struct utcp *utcp); +extern void utcp_set_mtu(struct utcp *utcp, uint16_t mtu); + +extern void utcp_reset_timers(struct utcp *utcp); + +extern void utcp_offline(struct utcp *utcp, bool offline); +extern void utcp_set_retransmit_cb(struct utcp *utcp, utcp_retransmit_t retransmit); + +// Per-socket options + +extern size_t utcp_get_sndbuf(struct utcp_connection *connection); +extern void utcp_set_sndbuf(struct utcp_connection *connection, size_t size); +extern size_t utcp_get_sndbuf_free(struct utcp_connection *connection); + +extern size_t utcp_get_rcvbuf(struct utcp_connection *connection); +extern void utcp_set_rcvbuf(struct utcp_connection *connection, size_t size); +extern size_t utcp_get_rcvbuf_free(struct utcp_connection *connection); + +extern size_t utcp_get_sendq(struct utcp_connection *connection); +extern size_t utcp_get_recvq(struct utcp_connection *connection); + +extern bool utcp_get_nodelay(struct utcp_connection *connection); +extern void utcp_set_nodelay(struct utcp_connection *connection, bool nodelay); + +extern bool utcp_get_keepalive(struct utcp_connection *connection); +extern void utcp_set_keepalive(struct utcp_connection *connection, bool keepalive); + +extern size_t utcp_get_outq(struct utcp_connection *connection); + +extern void utcp_expect_data(struct utcp_connection *connection, bool expect); + +// Completely global options + +extern void utcp_set_clock_granularity(long granularity); + +#endif diff --git a/src/utcp_priv.h b/src/utcp_priv.h new file mode 100644 index 00000000..197fd268 --- /dev/null +++ b/src/utcp_priv.h @@ -0,0 +1,201 @@ +/* + utcp.h -- Userspace TCP + Copyright (C) 2014 Guus Sliepen + + This program is free software; you can redistribute it and/or modify + it under the terms of the GNU General Public License as published by + the Free Software Foundation; either version 2 of the License, or + (at your option) any later version. + + This program is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU General Public License for more details. + + You should have received a copy of the GNU General Public License along + with this program; if not, write to the Free Software Foundation, Inc., + 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA. +*/ + +#ifndef UTCP_PRIV_H +#define UTCP_PRIV_H + +#define UTCP_INTERNAL +#include "utcp.h" + +#define PREP(l) char pkt[(l) + sizeof struct hdr]; struct hdr *hdr = &pkt; + +#define SYN 1 +#define ACK 2 +#define FIN 4 +#define RST 8 +#define MF 16 + +#define AUX_INIT 1 +#define AUX_FRAME 2 +#define AUX_SAK 3 +#define AUX_TIMESTAMP 4 + +#define NSACKS 4 +#define DEFAULT_SNDBUFSIZE 4096 +#define DEFAULT_MAXSNDBUFSIZE 131072 +#define DEFAULT_RCVBUFSIZE 0 +#define DEFAULT_MAXRCVBUFSIZE 131072 + +#define MAX_UNRELIABLE_SIZE 65536 +#define DEFAULT_MTU 1000 + +#define USEC_PER_SEC 1000000L +#define NSEC_PER_SEC 1000000000L +#define DEFAULT_USER_TIMEOUT 60 +#define START_RTO (1 * USEC_PER_SEC) +#define MAX_RTO (3 * USEC_PER_SEC) + +struct hdr { + uint16_t src; // Source port + uint16_t dst; // Destination port + uint32_t seq; // Sequence number + uint32_t ack; // Acknowledgement number + uint32_t wnd; // Window size + uint16_t ctl; // Flags (SYN, ACK, FIN, RST) + uint16_t aux; // other stuff +}; + +enum state { + CLOSED, + LISTEN, + SYN_SENT, + SYN_RECEIVED, + ESTABLISHED, + FIN_WAIT_1, + FIN_WAIT_2, + CLOSE_WAIT, + CLOSING, + LAST_ACK, + TIME_WAIT +}; + +static const char *strstate[] __attribute__((unused)) = { + [CLOSED] = "CLOSED", + [LISTEN] = "LISTEN", + [SYN_SENT] = "SYN_SENT", + [SYN_RECEIVED] = "SYN_RECEIVED", + [ESTABLISHED] = "ESTABLISHED", + [FIN_WAIT_1] = "FIN_WAIT_1", + [FIN_WAIT_2] = "FIN_WAIT_2", + [CLOSE_WAIT] = "CLOSE_WAIT", + [CLOSING] = "CLOSING", + [LAST_ACK] = "LAST_ACK", + [TIME_WAIT] = "TIME_WAIT" +}; + +struct buffer { + char *data; + uint32_t offset; + uint32_t used; + uint32_t size; + uint32_t maxsize; +}; + +struct sack { + uint32_t offset; + uint32_t len; +}; + +struct utcp_connection { + void *priv; + struct utcp *utcp; + uint32_t flags; + + bool reapable; + bool do_poll; + + // Callbacks + + utcp_recv_t recv; + utcp_poll_t poll; + + // TCP State + + uint16_t src; + uint16_t dst; + enum state state; + + struct { + uint32_t una; + uint32_t nxt; + uint32_t wnd; + uint32_t iss; + + uint32_t last; + uint32_t cwnd; + uint32_t ssthresh; + } snd; + + struct { + uint32_t nxt; + uint32_t irs; + } rcv; + + int dupack; + + // Timers + + struct timespec conn_timeout; + struct timespec rtrx_timeout; + struct timespec rtt_start; + uint32_t rtt_seq; + + // RTT variables + + uint32_t srtt; // usec + uint32_t rttvar; // usec + uint32_t rto; // usec + + // Buffers + + uint32_t prev_free; + struct buffer sndbuf; + struct buffer rcvbuf; + struct sack sacks[NSACKS]; + + // Per-socket options + + bool nodelay; + bool keepalive; + bool shut_wr; + + // Congestion avoidance state + + struct timespec tlast; + uint64_t bandwidth; +}; + +struct utcp { + void *priv; + + // Callbacks + + utcp_accept_t accept; + utcp_pre_accept_t pre_accept; + utcp_retransmit_t retransmit; + utcp_send_t send; + + // Packet buffer + + void *pkt; + + // Global socket options + + uint16_t mtu; // The maximum size of a UTCP packet, including headers. + uint16_t mss; // The maximum size of the payload of a UTCP packet. + int timeout; // sec + + // Connection management + + struct utcp_connection **connections; + int nconnections; + int nallocated; +}; + +#endif diff --git a/test/Makefile.am b/test/Makefile.am index f631ea50..2a022a6c 100644 --- a/test/Makefile.am +++ b/test/Makefile.am @@ -18,7 +18,9 @@ TESTS = \ invite-join \ sign-verify \ trio \ - trio2 + trio2 \ + utcp-benchmark \ + utcp-benchmark-stream if BLACKBOX_TESTS SUBDIRS = blackbox @@ -49,6 +51,7 @@ check_PROGRAMS = \ import-export \ invite-join \ sign-verify \ + stream \ trio \ trio2 diff --git a/test/stream.c b/test/stream.c new file mode 100644 index 00000000..378809cc --- /dev/null +++ b/test/stream.c @@ -0,0 +1,156 @@ +#define _GNU_SOURCE + +#include +#include +#include +#include +#include +#include +#include +#include +#include + +int main(int argc, char *argv[]) { + static const struct option longopts[] = { + {"verify", 0, NULL, 'v'}, + {"rate", 1, NULL, 'r'}, + {"fps", 1, NULL, 'f'}, + {"total", 1, NULL, 't'}, + {NULL, 0, NULL, 0}, + }; + + int opt; + bool verify = false; + float rate = 1e6; + float fps = 60; + float total = 1.0 / 0.0; + + while((opt = getopt_long(argc, argv, "vr:f:t:", longopts, &optind)) != -1) { + switch(opt) { + case 'v': + verify = true; + break; + + case 'r': + rate = atof(optarg); + break; + + case 'f': + fps = atof(optarg); + break; + + case 't': + total = atof(optarg); + break; + + default: + fprintf(stderr, "Usage: %s [-v] [-r bitrate] [-f frames_per_second]\n", argv[0]); + return 1; + } + } + + size_t framesize = rate / fps / 8; + framesize &= ~0xf; + long interval = 1e9 / fps; + + if(!framesize || interval <= 0) { + err(1, "invalid parameters"); + } + + char *buf = malloc(framesize + 16); + + if(!buf) { + err(1, "malloc(%zu)", framesize); + } + + uint64_t counter = 0; + struct timespec now, next = {0}; + clock_gettime(CLOCK_REALTIME, &now); + + while(total > 0) { + if(!verify) { + size_t tosend = framesize; + char *p = buf; + + memcpy(buf, &now, sizeof(now)); + + for(uint64_t *q = (uint64_t *)(buf + sizeof(now)); (char *)q < buf + framesize; q++) { + *q = counter++; + } + + while(tosend) { + ssize_t sent = write(1, p, tosend); + + if(sent <= 0) { + err(1, "write(1, %p, %zu)", p, tosend); + } + + tosend -= sent; + p += sent; + } + + next = now; + next.tv_nsec += interval; + + while(next.tv_nsec >= 1000000000) { + next.tv_nsec -= 1000000000; + next.tv_sec++; + } + + clock_nanosleep(CLOCK_REALTIME, TIMER_ABSTIME, &next, NULL); + now = next; + total -= framesize; + } else { + struct timespec *ts = (struct timespec *)buf; + size_t toread = sizeof(*ts); + char *p = buf; + + while(toread) { + ssize_t result = read(0, p, toread); + + if(result <= 0) { + err(1, "read(1, %p, %zu)", p, toread); + } + + toread -= result; + p += result; + } + + clock_gettime(CLOCK_REALTIME, &now); + + toread = framesize - sizeof(now); + + while(toread) { + ssize_t result = read(0, p, toread); + + if(result <= 0) { + err(1, "read(1, %p, %zu)", p, toread); + } + + toread -= result; + p += result; + } + + clock_gettime(CLOCK_REALTIME, &next); + + for(uint64_t *q = (uint64_t *)(buf + sizeof(now)); (char *)q < buf + framesize; q++) { + if(*q != counter++) { + uint64_t offset = (counter - 1) * 8; + offset += ((counter * 8) / (framesize - sizeof(now))) * sizeof(now); + err(1, "verification failed at offset %lu", offset); + } + } + + float dt1 = now.tv_sec - ts->tv_sec + 1e-9 * (now.tv_nsec - ts->tv_nsec); + float dt2 = next.tv_sec - now.tv_sec + 1e-9 * (next.tv_nsec - now.tv_nsec); + + fprintf(stderr, "\rDelay: %8.3f ms, burst bandwidth: %8.0f Mbit/s", dt1 * 1e3, (framesize - sizeof(now)) / dt2 * 8 / 1e6); + + total -= framesize; + } + } + + if(verify) { + fprintf(stderr, "\n"); + } +} diff --git a/test/utcp-benchmark b/test/utcp-benchmark new file mode 100755 index 00000000..d8eafd77 --- /dev/null +++ b/test/utcp-benchmark @@ -0,0 +1,80 @@ +#!/bin/bash +set -e + +# Require root permissions +test "$(id -u)" = "0" || exit 77 + +# Configuration +LOG_PREFIX=/dev/shm/utcp-benchmark-log +SIZE=10000000 + +# Network parameters +# Some realistic values: +# - Gbit LAN connection: RATE=1gbit DELAY=0.4ms JITTER=0.04ms LOSS=0% +# - Fast WAN connection: RATE=100mbit DELAY=50ms JITTER=3ms LOSS=0% +# - 5GHz WiFi connection: RATE=90mbit DELAY=5ms JITTER=1ms LOSS=0% +RATE=100mbit +DELAY=10ms +JITTER=1ms +LOSS=0.1% + +# Maximum achievable bandwidth is limited to BUFSIZE / (2 * DELAY) +# The Linux kernel has a default maximum send buffer of 4 MiB +#export BUFSIZE=4194304 + +# Remove old log files +rm -f $LOG_PREFIX-* 2>/dev/null + +# Clean up old namespaces +ip link del utcp-left 2>/dev/null || true +ip link del utcp-right 2>/dev/null || true +ip netns delete utcp-left 2>/dev/null || true +ip netns delete utcp-right 2>/dev/null || true + +# Set up the left namespace +ip netns add utcp-left +ip link add name utcp-left type veth peer name utcp-right +ip link set utcp-left netns utcp-left + +ip netns exec utcp-left ethtool -K utcp-left tso off +ip netns exec utcp-left ip link set dev lo up +ip netns exec utcp-left ip addr add dev utcp-left 192.168.1.1/24 +ip netns exec utcp-left ip link set utcp-left up + +#ip netns exec utcp-left tc qdisc del dev utcp-left root +ip netns exec utcp-left tc qdisc add dev utcp-left root netem rate $RATE delay $DELAY $JITTER loss random $LOSS + +# Set up the right namespace +ip netns add utcp-right +ip link set utcp-right netns utcp-right + +ip netns exec utcp-right ethtool -K utcp-right tso off +ip netns exec utcp-right ip link set dev lo up +ip netns exec utcp-right ip addr add dev utcp-right 192.168.1.2/24 +ip netns exec utcp-right ip link set utcp-right up + +#ip netns exec utcp-right tc qdisc del dev utcp-right root +ip netns exec utcp-right tc qdisc add dev utcp-right root netem rate $RATE delay $DELAY $JITTER loss random $LOSS +# Test using kernel TCP +ip netns exec utcp-right tcpdump -i utcp-right -w $LOG_PREFIX-socat.pcap port 9999 2>/dev/null & +ip netns exec utcp-left socat TCP4-LISTEN:9999 - >/dev/null & +sleep 0.1 +head -c $SIZE /dev/zero | ip netns exec utcp-right time socat - TCP4:192.168.1.1:9999 2>$LOG_PREFIX-socat-client.txt >/dev/null +sleep 0.1 +kill $(jobs -p) 2>/dev/null + +# Test using UTCP +ip netns exec utcp-right tcpdump -i utcp-right -w $LOG_PREFIX-utcp.pcap udp port 9999 2>/dev/null & +ip netns exec utcp-left ./test 9999 2>$LOG_PREFIX-server.txt >/dev/null & +sleep 0.1 +head -c $SIZE /dev/zero | ip netns exec utcp-right time ./test 192.168.1.1 9999 2>$LOG_PREFIX-client.txt >/dev/null +sleep 0.1 +kill $(jobs -p) 2>/dev/null + +# Print timing statistics +echo "Regular TCP:" +tail -2 $LOG_PREFIX-socat-client.txt + +echo +echo "UTCP:" +tail -3 $LOG_PREFIX-client.txt diff --git a/test/utcp-benchmark-stream b/test/utcp-benchmark-stream new file mode 100755 index 00000000..88c32e37 --- /dev/null +++ b/test/utcp-benchmark-stream @@ -0,0 +1,85 @@ +#!/bin/bash +set -e + +# Require root permissions +test "$(id -u)" = "0" || exit 77 + +# Configuration +LOG_PREFIX=/dev/shm/utcp-benchmark-log + +# Size in bytes +SIZE=2e6 + +# Rate of generated stream in bits/s +STREAMRATE=10e6 + +# Network parameters +# Some realistic values: +# - Gbit LAN connection: RATE=1gbit DELAY=0.4ms JITTER=0.04ms LOSS=0% +# - Fast WAN connection: RATE=100mbit DELAY=50ms JITTER=3ms LOSS=0% +# - 5GHz WiFi connection: RATE=90mbit DELAY=5ms JITTER=1ms LOSS=0% +RATE=100mbit +DELAY=10ms +JITTER=1ms +LOSS=0.1% + +# Maximum achievable bandwidth is limited to BUFSIZE / (2 * DELAY) +# The Linux kernel has a default maximum send buffer of 4 MiB +#export BUFSIZE=4194304 + +# Remove old log files +rm -f $LOG_PREFIX-* 2>/dev/null + +# Clean up old namespaces +ip link del utcp-left 2>/dev/null || true +ip link del utcp-right 2>/dev/null || true +ip netns delete utcp-left 2>/dev/null || true +ip netns delete utcp-right 2>/dev/null || true + +# Set up the left namespace +ip netns add utcp-left +ip link add name utcp-left type veth peer name utcp-right +ip link set utcp-left netns utcp-left + +ip netns exec utcp-left ethtool -K utcp-left tso off +ip netns exec utcp-left ip link set dev lo up +ip netns exec utcp-left ip addr add dev utcp-left 192.168.1.1/24 +ip netns exec utcp-left ip link set utcp-left up + +#ip netns exec utcp-left tc qdisc del dev utcp-left root +ip netns exec utcp-left tc qdisc add dev utcp-left root netem rate $RATE delay $DELAY $JITTER loss random $LOSS + +# Set up the right namespace +ip netns add utcp-right +ip link set utcp-right netns utcp-right + +ip netns exec utcp-right ethtool -K utcp-right tso off +ip netns exec utcp-right ip link set dev lo up +ip netns exec utcp-right ip addr add dev utcp-right 192.168.1.2/24 +ip netns exec utcp-right ip link set utcp-right up + +#ip netns exec utcp-right tc qdisc del dev utcp-right root +ip netns exec utcp-right tc qdisc add dev utcp-right root netem rate $RATE delay $DELAY $JITTER loss random $LOSS +# Test using kernel TCP +ip netns exec utcp-right tcpdump -i utcp-right -w $LOG_PREFIX-socat.pcap port 9999 2>/dev/null & +ip netns exec utcp-left socat TCP4-LISTEN:9999 - $LOG_PREFIX-socat-client.txt >/dev/null +sleep 0.1 +kill $(jobs -p) 2>/dev/null + +# Test using UTCP +ip netns exec utcp-right tcpdump -i utcp-right -w $LOG_PREFIX-utcp.pcap udp port 9999 2>/dev/null & +ip netns exec utcp-left ./test 9999 2>$LOG_PREFIX-server.txt $LOG_PREFIX-client.txt >/dev/null +sleep 0.1 +kill $(jobs -p) 2>/dev/null + +# Print timing statistics +echo "Regular TCP:" +tail -2 $LOG_PREFIX-socat-client.txt + +echo +echo "UTCP:" +tail -3 $LOG_PREFIX-client.txt -- 2.39.2