2 utcp.c -- Userspace TCP
3 Copyright (C) 2014-2017 Guus Sliepen <guus@tinc-vpn.org>
5 This program is free software; you can redistribute it and/or modify
6 it under the terms of the GNU General Public License as published by
7 the Free Software Foundation; either version 2 of the License, or
8 (at your option) any later version.
10 This program is distributed in the hope that it will be useful,
11 but WITHOUT ANY WARRANTY; without even the implied warranty of
12 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13 GNU General Public License for more details.
15 You should have received a copy of the GNU General Public License along
16 with this program; if not, write to the Free Software Foundation, Inc.,
17 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
32 #include "utcp_priv.h"
47 #if defined(CLOCK_MONOTONIC_RAW) && defined(__x86_64__)
48 #define UTCP_CLOCK CLOCK_MONOTONIC_RAW
50 #define UTCP_CLOCK CLOCK_MONOTONIC
54 static void timespec_sub(const struct timespec *a, const struct timespec *b, struct timespec *r) {
55 r->tv_sec = a->tv_sec - b->tv_sec;
56 r->tv_nsec = a->tv_nsec - b->tv_nsec;
59 r->tv_sec--, r->tv_nsec += NSEC_PER_SEC;
63 static int32_t timespec_diff_usec(const struct timespec *a, const struct timespec *b) {
64 return (a->tv_sec - b->tv_sec) * 1000000 + (a->tv_nsec - b->tv_nsec) / 1000;
67 static bool timespec_lt(const struct timespec *a, const struct timespec *b) {
68 if(a->tv_sec == b->tv_sec) {
69 return a->tv_nsec < b->tv_nsec;
71 return a->tv_sec < b->tv_sec;
75 static void timespec_clear(struct timespec *a) {
80 static bool timespec_isset(const struct timespec *a) {
84 static long CLOCK_GRANULARITY; // usec
86 static inline size_t min(size_t a, size_t b) {
90 static inline size_t max(size_t a, size_t b) {
97 #ifndef UTCP_DEBUG_DATALEN
98 #define UTCP_DEBUG_DATALEN 20
101 static void debug(struct utcp_connection *c, const char *format, ...) {
106 clock_gettime(CLOCK_REALTIME, &tv);
107 len = snprintf(buf, sizeof(buf), "%ld.%06lu %u:%u ", (long)tv.tv_sec, tv.tv_nsec / 1000, c ? c->src : 0, c ? c->dst : 0);
109 va_start(ap, format);
110 len += vsnprintf(buf + len, sizeof(buf) - len, format, ap);
113 if(len > 0 && (size_t)len < sizeof(buf)) {
114 fwrite(buf, len, 1, stderr);
118 static void print_packet(struct utcp_connection *c, const char *dir, const void *pkt, size_t len) {
121 if(len < sizeof(hdr)) {
122 debug(c, "%s: short packet (%lu bytes)\n", dir, (unsigned long)len);
126 memcpy(&hdr, pkt, sizeof(hdr));
130 if(len > sizeof(hdr)) {
131 datalen = min(len - sizeof(hdr), UTCP_DEBUG_DATALEN);
137 const uint8_t *data = (uint8_t *)pkt + sizeof(hdr);
138 char str[datalen * 2 + 1];
141 for(uint32_t i = 0; i < datalen; i++) {
142 *p++ = "0123456789ABCDEF"[data[i] >> 4];
143 *p++ = "0123456789ABCDEF"[data[i] & 15];
148 debug(c, "%s: len %lu src %u dst %u seq %u ack %u wnd %u aux %x ctl %s%s%s%s%s data %s\n",
149 dir, (unsigned long)len, hdr.src, hdr.dst, hdr.seq, hdr.ack, hdr.wnd, hdr.aux,
150 hdr.ctl & SYN ? "SYN" : "",
151 hdr.ctl & RST ? "RST" : "",
152 hdr.ctl & FIN ? "FIN" : "",
153 hdr.ctl & ACK ? "ACK" : "",
154 hdr.ctl & MF ? "MF" : "",
159 static void debug_cwnd(struct utcp_connection *c) {
160 debug(c, "snd.cwnd %u snd.ssthresh %u\n", c->snd.cwnd, ~c->snd.ssthresh ? c->snd.ssthresh : 0);
163 #define debug(...) do {} while(0)
164 #define print_packet(...) do {} while(0)
165 #define debug_cwnd(...) do {} while(0)
168 static void set_state(struct utcp_connection *c, enum state state) {
171 if(state == ESTABLISHED) {
172 timespec_clear(&c->conn_timeout);
175 debug(c, "state %s\n", strstate[state]);
178 static bool fin_wanted(struct utcp_connection *c, uint32_t seq) {
179 if(seq != c->snd.last) {
194 static bool is_reliable(struct utcp_connection *c) {
195 return c->flags & UTCP_RELIABLE;
198 static bool is_framed(struct utcp_connection *c) {
199 return c->flags & UTCP_FRAMED;
202 static int32_t seqdiff(uint32_t a, uint32_t b) {
207 static bool buffer_wraps(struct buffer *buf) {
208 return buf->size - buf->offset < buf->used;
211 static bool buffer_resize(struct buffer *buf, uint32_t newsize) {
212 char *newdata = realloc(buf->data, newsize);
220 if(buffer_wraps(buf)) {
221 // Shift the right part of the buffer until it hits the end of the new buffer.
225 // [345.........|........012]
226 uint32_t tailsize = buf->size - buf->offset;
227 uint32_t newoffset = newsize - tailsize;
228 memmove(buf->data + newoffset, buf->data + buf->offset, tailsize);
229 buf->offset = newoffset;
236 // Store data into the buffer
237 static ssize_t buffer_put_at(struct buffer *buf, size_t offset, const void *data, size_t len) {
238 debug(NULL, "buffer_put_at %lu %lu %lu\n", (unsigned long)buf->used, (unsigned long)offset, (unsigned long)len);
240 // Ensure we don't store more than maxsize bytes in total
241 size_t required = offset + len;
243 if(required > buf->maxsize) {
244 if(offset >= buf->maxsize) {
248 len = buf->maxsize - offset;
249 required = buf->maxsize;
252 // Check if we need to resize the buffer
253 if(required > buf->size) {
254 size_t newsize = buf->size;
262 } while(newsize < required);
264 if(newsize > buf->maxsize) {
265 newsize = buf->maxsize;
268 if(!buffer_resize(buf, newsize)) {
273 uint32_t realoffset = buf->offset + offset;
275 if(buf->size - buf->offset <= offset) {
276 // The offset wrapped
277 realoffset -= buf->size;
280 if(buf->size - realoffset < len) {
281 // The new chunk of data must be wrapped
282 memcpy(buf->data + realoffset, data, buf->size - realoffset);
283 memcpy(buf->data, (char *)data + buf->size - realoffset, len - (buf->size - realoffset));
285 memcpy(buf->data + realoffset, data, len);
288 if(required > buf->used) {
289 buf->used = required;
295 static ssize_t buffer_put(struct buffer *buf, const void *data, size_t len) {
296 return buffer_put_at(buf, buf->used, data, len);
299 // Copy data from the buffer without removing it.
300 static ssize_t buffer_copy(struct buffer *buf, void *data, size_t offset, size_t len) {
301 // Ensure we don't copy more than is actually stored in the buffer
302 if(offset >= buf->used) {
306 if(buf->used - offset < len) {
307 len = buf->used - offset;
310 uint32_t realoffset = buf->offset + offset;
312 if(buf->size - buf->offset <= offset) {
313 // The offset wrapped
314 realoffset -= buf->size;
317 if(buf->size - realoffset < len) {
318 // The data is wrapped
319 memcpy(data, buf->data + realoffset, buf->size - realoffset);
320 memcpy((char *)data + buf->size - realoffset, buf->data, len - (buf->size - realoffset));
322 memcpy(data, buf->data + realoffset, len);
328 // Copy data from the buffer without removing it.
329 static ssize_t buffer_call(struct utcp_connection *c, struct buffer *buf, size_t offset, size_t len) {
334 // Ensure we don't copy more than is actually stored in the buffer
335 if(offset >= buf->used) {
339 if(buf->used - offset < len) {
340 len = buf->used - offset;
343 uint32_t realoffset = buf->offset + offset;
345 if(buf->size - buf->offset <= offset) {
346 // The offset wrapped
347 realoffset -= buf->size;
350 if(buf->size - realoffset < len) {
351 // The data is wrapped
352 ssize_t rx1 = c->recv(c, buf->data + realoffset, buf->size - realoffset);
354 if(rx1 < buf->size - realoffset) {
358 // The channel might have been closed by the previous callback
363 ssize_t rx2 = c->recv(c, buf->data, len - (buf->size - realoffset));
371 return c->recv(c, buf->data + realoffset, len);
375 // Discard data from the buffer.
376 static ssize_t buffer_discard(struct buffer *buf, size_t len) {
377 if(buf->used < len) {
381 if(buf->size - buf->offset <= len) {
382 buf->offset -= buf->size;
385 if(buf->used == len) {
396 static void buffer_clear(struct buffer *buf) {
401 static bool buffer_set_size(struct buffer *buf, uint32_t minsize, uint32_t maxsize) {
402 if(maxsize < minsize) {
406 buf->maxsize = maxsize;
408 return buf->size >= minsize || buffer_resize(buf, minsize);
411 static void buffer_exit(struct buffer *buf) {
413 memset(buf, 0, sizeof(*buf));
416 static uint32_t buffer_free(const struct buffer *buf) {
417 return buf->maxsize > buf->used ? buf->maxsize - buf->used : 0;
420 // Connections are stored in a sorted list.
421 // This gives O(log(N)) lookup time, O(N log(N)) insertion time and O(N) deletion time.
423 static int compare(const void *va, const void *vb) {
426 const struct utcp_connection *a = *(struct utcp_connection **)va;
427 const struct utcp_connection *b = *(struct utcp_connection **)vb;
430 assert(a->src && b->src);
432 int c = (int)a->src - (int)b->src;
438 c = (int)a->dst - (int)b->dst;
442 static struct utcp_connection *find_connection(const struct utcp *utcp, uint16_t src, uint16_t dst) {
443 if(!utcp->nconnections) {
447 struct utcp_connection key = {
451 struct utcp_connection **match = bsearch(&keyp, utcp->connections, utcp->nconnections, sizeof(*utcp->connections), compare);
452 return match ? *match : NULL;
455 static void free_connection(struct utcp_connection *c) {
456 struct utcp *utcp = c->utcp;
457 struct utcp_connection **cp = bsearch(&c, utcp->connections, utcp->nconnections, sizeof(*utcp->connections), compare);
461 int i = cp - utcp->connections;
462 memmove(cp, cp + 1, (utcp->nconnections - i - 1) * sizeof(*cp));
463 utcp->nconnections--;
465 buffer_exit(&c->rcvbuf);
466 buffer_exit(&c->sndbuf);
470 static struct utcp_connection *allocate_connection(struct utcp *utcp, uint16_t src, uint16_t dst) {
471 // Check whether this combination of src and dst is free
474 if(find_connection(utcp, src, dst)) {
478 } else { // If src == 0, generate a random port number with the high bit set
479 if(utcp->nconnections >= 32767) {
484 src = rand() | 0x8000;
486 while(find_connection(utcp, src, dst)) {
491 // Allocate memory for the new connection
493 if(utcp->nconnections >= utcp->nallocated) {
494 if(!utcp->nallocated) {
495 utcp->nallocated = 4;
497 utcp->nallocated *= 2;
500 struct utcp_connection **new_array = realloc(utcp->connections, utcp->nallocated * sizeof(*utcp->connections));
506 utcp->connections = new_array;
509 struct utcp_connection *c = calloc(1, sizeof(*c));
515 if(!buffer_set_size(&c->sndbuf, DEFAULT_SNDBUFSIZE, DEFAULT_MAXSNDBUFSIZE)) {
520 if(!buffer_set_size(&c->rcvbuf, DEFAULT_RCVBUFSIZE, DEFAULT_MAXRCVBUFSIZE)) {
521 buffer_exit(&c->sndbuf);
526 // Fill in the details
535 c->snd.una = c->snd.iss;
536 c->snd.nxt = c->snd.iss + 1;
537 c->snd.last = c->snd.nxt;
538 c->snd.cwnd = (utcp->mss > 2190 ? 2 : utcp->mss > 1095 ? 3 : 4) * utcp->mss;
539 c->snd.ssthresh = ~0;
546 // Add it to the sorted list of connections
548 utcp->connections[utcp->nconnections++] = c;
549 qsort(utcp->connections, utcp->nconnections, sizeof(*utcp->connections), compare);
554 static inline uint32_t absdiff(uint32_t a, uint32_t b) {
562 // Update RTT variables. See RFC 6298.
563 static void update_rtt(struct utcp_connection *c, uint32_t rtt) {
565 debug(c, "invalid rtt\n");
573 c->rttvar = (c->rttvar * 3 + absdiff(c->srtt, rtt)) / 4;
574 c->srtt = (c->srtt * 7 + rtt) / 8;
577 c->rto = c->srtt + max(4 * c->rttvar, CLOCK_GRANULARITY);
579 if(c->rto > MAX_RTO) {
583 debug(c, "rtt %u srtt %u rttvar %u rto %u\n", rtt, c->srtt, c->rttvar, c->rto);
586 static void start_retransmit_timer(struct utcp_connection *c) {
587 clock_gettime(UTCP_CLOCK, &c->rtrx_timeout);
589 uint32_t rto = c->rto;
591 while(rto > USEC_PER_SEC) {
592 c->rtrx_timeout.tv_sec++;
596 c->rtrx_timeout.tv_nsec += rto * 1000;
598 if(c->rtrx_timeout.tv_nsec >= NSEC_PER_SEC) {
599 c->rtrx_timeout.tv_nsec -= NSEC_PER_SEC;
600 c->rtrx_timeout.tv_sec++;
603 debug(c, "rtrx_timeout %ld.%06lu\n", c->rtrx_timeout.tv_sec, c->rtrx_timeout.tv_nsec);
606 static void start_flush_timer(struct utcp_connection *c) {
607 clock_gettime(UTCP_CLOCK, &c->rtrx_timeout);
609 c->rtrx_timeout.tv_nsec += c->utcp->flush_timeout * 1000000;
611 if(c->rtrx_timeout.tv_nsec >= NSEC_PER_SEC) {
612 c->rtrx_timeout.tv_nsec -= NSEC_PER_SEC;
613 c->rtrx_timeout.tv_sec++;
616 debug(c, "rtrx_timeout %ld.%06lu (flush)\n", c->rtrx_timeout.tv_sec, c->rtrx_timeout.tv_nsec);
619 static void stop_retransmit_timer(struct utcp_connection *c) {
620 timespec_clear(&c->rtrx_timeout);
621 debug(c, "rtrx_timeout cleared\n");
624 struct utcp_connection *utcp_connect_ex(struct utcp *utcp, uint16_t dst, utcp_recv_t recv, void *priv, uint32_t flags) {
625 struct utcp_connection *c = allocate_connection(utcp, 0, dst);
631 assert((flags & ~0x1f) == 0);
642 pkt.hdr.src = c->src;
643 pkt.hdr.dst = c->dst;
644 pkt.hdr.seq = c->snd.iss;
646 pkt.hdr.wnd = c->rcvbuf.maxsize;
648 pkt.hdr.aux = 0x0101;
652 pkt.init[3] = flags & 0x7;
654 set_state(c, SYN_SENT);
656 print_packet(c, "send", &pkt, sizeof(pkt));
657 utcp->send(utcp, &pkt, sizeof(pkt));
659 clock_gettime(UTCP_CLOCK, &c->conn_timeout);
660 c->conn_timeout.tv_sec += utcp->timeout;
662 start_retransmit_timer(c);
667 struct utcp_connection *utcp_connect(struct utcp *utcp, uint16_t dst, utcp_recv_t recv, void *priv) {
668 return utcp_connect_ex(utcp, dst, recv, priv, UTCP_TCP);
671 void utcp_accept(struct utcp_connection *c, utcp_recv_t recv, void *priv) {
672 if(c->reapable || c->state != SYN_RECEIVED) {
673 debug(c, "accept() called on invalid connection in state %s\n", c, strstate[c->state]);
677 debug(c, "accepted %p %p\n", c, recv, priv);
681 set_state(c, ESTABLISHED);
684 static void ack(struct utcp_connection *c, bool sendatleastone) {
685 int32_t left = seqdiff(c->snd.last, c->snd.nxt);
686 int32_t cwndleft = is_reliable(c) ? min(c->snd.cwnd, c->snd.wnd) - seqdiff(c->snd.nxt, c->snd.una) : MAX_UNRELIABLE_SIZE;
692 } else if(cwndleft < left) {
695 if(!sendatleastone || cwndleft > c->utcp->mss) {
696 left -= left % c->utcp->mss;
700 debug(c, "cwndleft %d left %d\n", cwndleft, left);
702 if(!left && !sendatleastone) {
709 } *pkt = c->utcp->pkt;
711 pkt->hdr.src = c->src;
712 pkt->hdr.dst = c->dst;
713 pkt->hdr.ack = c->rcv.nxt;
714 pkt->hdr.wnd = is_reliable(c) ? c->rcvbuf.maxsize : 0;
719 uint32_t seglen = left > c->utcp->mss ? c->utcp->mss : left;
720 pkt->hdr.seq = c->snd.nxt;
722 buffer_copy(&c->sndbuf, pkt->data, seqdiff(c->snd.nxt, c->snd.una), seglen);
724 c->snd.nxt += seglen;
727 if(!is_reliable(c)) {
735 if(seglen && fin_wanted(c, c->snd.nxt)) {
740 if(!c->rtt_start.tv_sec && is_reliable(c)) {
741 // Start RTT measurement
742 clock_gettime(UTCP_CLOCK, &c->rtt_start);
743 c->rtt_seq = pkt->hdr.seq + seglen;
744 debug(c, "starting RTT measurement, expecting ack %u\n", c->rtt_seq);
747 print_packet(c, "send", pkt, sizeof(pkt->hdr) + seglen);
748 c->utcp->send(c->utcp, pkt, sizeof(pkt->hdr) + seglen);
750 if(left && !is_reliable(c)) {
751 pkt->hdr.wnd += seglen;
756 static ssize_t utcp_send_reliable(struct utcp_connection *c, const void *data, size_t len) {
757 size_t rlen = len + (is_framed(c) ? 2 : 0);
763 // Check if we need to be able to buffer all data
765 if(c->flags & (UTCP_NO_PARTIAL | UTCP_FRAMED)) {
766 if(rlen > c->sndbuf.maxsize) {
771 if((c->flags & UTCP_FRAMED) && len > MAX_UNRELIABLE_SIZE) {
776 if(rlen > buffer_free(&c->sndbuf)) {
782 // Add data to the send buffer.
785 uint16_t len16 = len;
786 buffer_put(&c->sndbuf, &len16, sizeof(len16));
787 assert(buffer_put(&c->sndbuf, data, len) == (ssize_t)len);
789 len = buffer_put(&c->sndbuf, data, len);
799 // Don't send anything yet if the connection has not fully established yet
801 if(c->state == SYN_SENT || c->state == SYN_RECEIVED) {
807 if(!timespec_isset(&c->rtrx_timeout)) {
808 start_retransmit_timer(c);
811 if(!timespec_isset(&c->conn_timeout)) {
812 clock_gettime(UTCP_CLOCK, &c->conn_timeout);
813 c->conn_timeout.tv_sec += c->utcp->timeout;
820 /* In the send buffer we can have multiple frames, each prefixed with their
821 length. However, the first frame might already have been partially sent. The
822 variable c->frame_offset tracks how much of a partial frame is left at the
823 start. If it is 0, it means there is no partial frame, and the first two
824 bytes in the send buffer are the length of the first frame.
826 After sending an MSS sized packet, we need to calculate the new frame_offset
827 value, since it is likely that the next packet will also have a partial frame
828 at the start. We do this by scanning the previously sent packet for frame
829 headers, to find out how many bytes of the last frame are left to send.
831 static void ack_unreliable_framed(struct utcp_connection *c) {
832 int32_t left = seqdiff(c->snd.last, c->snd.nxt);
838 } *pkt = c->utcp->pkt;
840 pkt->hdr.src = c->src;
841 pkt->hdr.dst = c->dst;
842 pkt->hdr.ack = c->rcv.nxt;
843 pkt->hdr.ctl = ACK | MF;
846 bool sent_packet = false;
848 while(left >= c->utcp->mss) {
849 pkt->hdr.wnd = c->frame_offset;
850 uint32_t seglen = c->utcp->mss;
852 pkt->hdr.seq = c->snd.nxt;
854 buffer_copy(&c->sndbuf, pkt->data, seqdiff(c->snd.nxt, c->snd.una), seglen);
856 c->snd.nxt += seglen;
857 c->snd.una = c->snd.nxt;
860 print_packet(c, "send", pkt, sizeof(pkt->hdr) + seglen);
861 c->utcp->send(c->utcp, pkt, sizeof(pkt->hdr) + seglen);
864 // Calculate the new frame offset
865 while(c->frame_offset < seglen) {
867 buffer_copy(&c->sndbuf, &framelen, c->frame_offset, sizeof(framelen));
868 c->frame_offset += framelen + 2;
871 buffer_discard(&c->sndbuf, seglen);
872 c->frame_offset -= seglen;
877 // We sent one packet but we have partial data left, (re)start the flush timer
878 start_flush_timer(c);
880 // There is no partial data in the send buffer, so stop the flush timer
881 stop_retransmit_timer(c);
886 static void flush_unreliable_framed(struct utcp_connection *c) {
887 int32_t left = seqdiff(c->snd.last, c->snd.nxt);
889 /* If the MSS dropped since last time ack_unreliable_frame() was called,
890 we might now have more than one segment worth of data left.
892 if(left > c->utcp->mss) {
893 ack_unreliable_framed(c);
894 left = seqdiff(c->snd.last, c->snd.nxt);
895 assert(left <= c->utcp->mss);
902 } *pkt = c->utcp->pkt;
904 pkt->hdr.src = c->src;
905 pkt->hdr.dst = c->dst;
906 pkt->hdr.seq = c->snd.nxt;
907 pkt->hdr.ack = c->rcv.nxt;
908 pkt->hdr.wnd = c->frame_offset;
909 pkt->hdr.ctl = ACK | MF;
912 uint32_t seglen = left;
914 buffer_copy(&c->sndbuf, pkt->data, seqdiff(c->snd.nxt, c->snd.una), seglen);
915 buffer_discard(&c->sndbuf, seglen);
917 c->snd.nxt += seglen;
918 c->snd.una = c->snd.nxt;
920 print_packet(c, "send", pkt, sizeof(pkt->hdr) + seglen);
921 c->utcp->send(c->utcp, pkt, sizeof(pkt->hdr) + seglen);
925 stop_retransmit_timer(c);
929 static ssize_t utcp_send_unreliable(struct utcp_connection *c, const void *data, size_t len) {
930 if(len > MAX_UNRELIABLE_SIZE) {
935 size_t rlen = len + (is_framed(c) ? 2 : 0);
937 if(rlen > buffer_free(&c->sndbuf)) {
938 if(rlen > c->sndbuf.maxsize) {
947 // Don't send anything yet if the connection has not fully established yet
949 if(c->state == SYN_SENT || c->state == SYN_RECEIVED) {
954 uint16_t framelen = len;
955 buffer_put(&c->sndbuf, &framelen, sizeof(framelen));
958 buffer_put(&c->sndbuf, data, len);
963 ack_unreliable_framed(c);
966 c->snd.una = c->snd.nxt = c->snd.last;
967 buffer_discard(&c->sndbuf, c->sndbuf.used);
973 ssize_t utcp_send(struct utcp_connection *c, const void *data, size_t len) {
975 debug(c, "send() called on closed connection\n");
983 debug(c, "send() called on unconnected connection\n");
998 debug(c, "send() called on closed connection\n");
1008 if(is_reliable(c)) {
1009 return utcp_send_reliable(c, data, len);
1011 return utcp_send_unreliable(c, data, len);
1015 static void swap_ports(struct hdr *hdr) {
1016 uint16_t tmp = hdr->src;
1017 hdr->src = hdr->dst;
1021 static void fast_retransmit(struct utcp_connection *c) {
1022 if(c->state == CLOSED || c->snd.last == c->snd.una) {
1023 debug(c, "fast_retransmit() called but nothing to retransmit!\n");
1027 struct utcp *utcp = c->utcp;
1032 } *pkt = c->utcp->pkt;
1034 pkt->hdr.src = c->src;
1035 pkt->hdr.dst = c->dst;
1036 pkt->hdr.wnd = c->rcvbuf.maxsize;
1045 // Send unacked data again.
1046 pkt->hdr.seq = c->snd.una;
1047 pkt->hdr.ack = c->rcv.nxt;
1049 uint32_t len = min(seqdiff(c->snd.last, c->snd.una), utcp->mss);
1051 if(fin_wanted(c, c->snd.una + len)) {
1053 pkt->hdr.ctl |= FIN;
1056 buffer_copy(&c->sndbuf, pkt->data, 0, len);
1057 print_packet(c, "rtrx", pkt, sizeof(pkt->hdr) + len);
1058 utcp->send(utcp, pkt, sizeof(pkt->hdr) + len);
1066 static void retransmit(struct utcp_connection *c) {
1067 if(c->state == CLOSED || c->snd.last == c->snd.una) {
1068 debug(c, "retransmit() called but nothing to retransmit!\n");
1069 stop_retransmit_timer(c);
1073 struct utcp *utcp = c->utcp;
1075 if(utcp->retransmit && is_reliable(c)) {
1076 utcp->retransmit(c);
1082 } *pkt = c->utcp->pkt;
1084 pkt->hdr.src = c->src;
1085 pkt->hdr.dst = c->dst;
1086 pkt->hdr.wnd = c->rcvbuf.maxsize;
1091 // Send our SYN again
1092 pkt->hdr.seq = c->snd.iss;
1095 pkt->hdr.aux = 0x0101;
1099 pkt->data[3] = c->flags & 0x7;
1100 print_packet(c, "rtrx", pkt, sizeof(pkt->hdr) + 4);
1101 utcp->send(utcp, pkt, sizeof(pkt->hdr) + 4);
1105 // Send SYNACK again
1106 pkt->hdr.seq = c->snd.nxt;
1107 pkt->hdr.ack = c->rcv.nxt;
1108 pkt->hdr.ctl = SYN | ACK;
1109 print_packet(c, "rtrx", pkt, sizeof(pkt->hdr));
1110 utcp->send(utcp, pkt, sizeof(pkt->hdr));
1118 if(!is_reliable(c) && is_framed(c) && c->sndbuf.used) {
1119 flush_unreliable_framed(c);
1123 // Send unacked data again.
1124 pkt->hdr.seq = c->snd.una;
1125 pkt->hdr.ack = c->rcv.nxt;
1127 uint32_t len = min(seqdiff(c->snd.last, c->snd.una), utcp->mss);
1129 if(fin_wanted(c, c->snd.una + len)) {
1131 pkt->hdr.ctl |= FIN;
1134 // RFC 5681 slow start after timeout
1135 uint32_t flightsize = seqdiff(c->snd.nxt, c->snd.una);
1136 c->snd.ssthresh = max(flightsize / 2, utcp->mss * 2); // eq. 4
1137 c->snd.cwnd = utcp->mss;
1140 buffer_copy(&c->sndbuf, pkt->data, 0, len);
1141 print_packet(c, "rtrx", pkt, sizeof(pkt->hdr) + len);
1142 utcp->send(utcp, pkt, sizeof(pkt->hdr) + len);
1144 c->snd.nxt = c->snd.una + len;
1151 // We shouldn't need to retransmit anything in this state.
1155 stop_retransmit_timer(c);
1159 start_retransmit_timer(c);
1162 if(c->rto > MAX_RTO) {
1166 c->rtt_start.tv_sec = 0; // invalidate RTT timer
1167 c->dupack = 0; // cancel any ongoing fast recovery
1173 /* Update receive buffer and SACK entries after consuming data.
1177 * |.....0000..1111111111.....22222......3333|
1180 * 0..3 represent the SACK entries. The ^ indicates up to which point we want
1181 * to remove data from the receive buffer. The idea is to substract "len"
1182 * from the offset of all the SACK entries, and then remove/cut down entries
1183 * that are shifted to before the start of the receive buffer.
1185 * There are three cases:
1186 * - the SACK entry is after ^, in that case just change the offset.
1187 * - the SACK entry starts before and ends after ^, so we have to
1188 * change both its offset and size.
1189 * - the SACK entry is completely before ^, in that case delete it.
1191 static void sack_consume(struct utcp_connection *c, size_t len) {
1192 debug(c, "sack_consume %lu\n", (unsigned long)len);
1194 if(len > c->rcvbuf.used) {
1195 debug(c, "all SACK entries consumed\n");
1196 c->sacks[0].len = 0;
1200 buffer_discard(&c->rcvbuf, len);
1202 for(int i = 0; i < NSACKS && c->sacks[i].len;) {
1203 if(len < c->sacks[i].offset) {
1204 c->sacks[i].offset -= len;
1206 } else if(len < c->sacks[i].offset + c->sacks[i].len) {
1207 c->sacks[i].len -= len - c->sacks[i].offset;
1208 c->sacks[i].offset = 0;
1211 if(i < NSACKS - 1) {
1212 memmove(&c->sacks[i], &c->sacks[i + 1], (NSACKS - 1 - i) * sizeof(c->sacks)[i]);
1213 c->sacks[NSACKS - 1].len = 0;
1215 c->sacks[i].len = 0;
1221 for(int i = 0; i < NSACKS && c->sacks[i].len; i++) {
1222 debug(c, "SACK[%d] offset %u len %u\n", i, c->sacks[i].offset, c->sacks[i].len);
1226 static void handle_out_of_order(struct utcp_connection *c, uint32_t offset, const void *data, size_t len) {
1227 debug(c, "out of order packet, offset %u\n", offset);
1228 // Packet loss or reordering occured. Store the data in the buffer.
1229 ssize_t rxd = buffer_put_at(&c->rcvbuf, offset, data, len);
1232 debug(c, "packet outside receive buffer, dropping\n");
1236 if((size_t)rxd < len) {
1237 debug(c, "packet partially outside receive buffer\n");
1241 // Make note of where we put it.
1242 for(int i = 0; i < NSACKS; i++) {
1243 if(!c->sacks[i].len) { // nothing to merge, add new entry
1244 debug(c, "new SACK entry %d\n", i);
1245 c->sacks[i].offset = offset;
1246 c->sacks[i].len = rxd;
1248 } else if(offset < c->sacks[i].offset) {
1249 if(offset + rxd < c->sacks[i].offset) { // insert before
1250 if(!c->sacks[NSACKS - 1].len) { // only if room left
1251 debug(c, "insert SACK entry at %d\n", i);
1252 memmove(&c->sacks[i + 1], &c->sacks[i], (NSACKS - i - 1) * sizeof(c->sacks)[i]);
1253 c->sacks[i].offset = offset;
1254 c->sacks[i].len = rxd;
1256 debug(c, "SACK entries full, dropping packet\n");
1261 debug(c, "merge with start of SACK entry at %d\n", i);
1262 c->sacks[i].offset = offset;
1265 } else if(offset <= c->sacks[i].offset + c->sacks[i].len) {
1266 if(offset + rxd > c->sacks[i].offset + c->sacks[i].len) { // merge
1267 debug(c, "merge with end of SACK entry at %d\n", i);
1268 c->sacks[i].len = offset + rxd - c->sacks[i].offset;
1269 // TODO: handle potential merge with next entry
1276 for(int i = 0; i < NSACKS && c->sacks[i].len; i++) {
1277 debug(c, "SACK[%d] offset %u len %u\n", i, c->sacks[i].offset, c->sacks[i].len);
1281 static void handle_out_of_order_framed(struct utcp_connection *c, uint32_t offset, const void *data, size_t len) {
1282 uint32_t in_order_offset = (c->sacks[0].len && c->sacks[0].offset == 0) ? c->sacks[0].len : 0;
1284 // Put the data into the receive buffer
1285 handle_out_of_order(c, offset + in_order_offset, data, len);
1288 static void handle_in_order(struct utcp_connection *c, const void *data, size_t len) {
1290 ssize_t rxd = c->recv(c, data, len);
1292 if(rxd != (ssize_t)len) {
1293 // TODO: handle the application not accepting all data.
1298 // Check if we can process out-of-order data now.
1299 if(c->sacks[0].len && len >= c->sacks[0].offset) {
1300 debug(c, "incoming packet len %lu connected with SACK at %u\n", (unsigned long)len, c->sacks[0].offset);
1302 if(len < c->sacks[0].offset + c->sacks[0].len) {
1303 size_t offset = len;
1304 len = c->sacks[0].offset + c->sacks[0].len;
1305 size_t remainder = len - offset;
1307 ssize_t rxd = buffer_call(c, &c->rcvbuf, offset, remainder);
1309 if(rxd != (ssize_t)remainder) {
1310 // TODO: handle the application not accepting all data.
1316 if(c->rcvbuf.used) {
1317 sack_consume(c, len);
1323 static void handle_in_order_framed(struct utcp_connection *c, const void *data, size_t len) {
1324 // Treat it as out of order, since it is unlikely the start of this packet contains the start of a frame.
1325 uint32_t in_order_offset = (c->sacks[0].len && c->sacks[0].offset == 0) ? c->sacks[0].len : 0;
1326 handle_out_of_order(c, in_order_offset, data, len);
1328 // While we have full frames at the start, give them to the application
1329 while(c->sacks[0].len >= 2 && c->sacks[0].offset == 0) {
1331 buffer_copy(&c->rcvbuf, &framelen, 0, sizeof(&framelen));
1333 if(framelen > c->sacks[0].len - 2) {
1339 uint32_t realoffset = c->rcvbuf.offset + 2;
1341 if(c->rcvbuf.size - c->rcvbuf.offset <= 2) {
1342 realoffset -= c->rcvbuf.size;
1345 if(realoffset > c->rcvbuf.size - framelen) {
1346 // The buffer wraps, we need to copy
1347 uint8_t buf[framelen];
1348 buffer_copy(&c->rcvbuf, buf, 2, framelen);
1349 rxd = c->recv(c, buf, framelen);
1351 // The frame is contiguous in the receive buffer
1352 rxd = c->recv(c, c->rcvbuf.data + realoffset, framelen);
1355 if(rxd != (ssize_t)framelen) {
1356 // TODO: handle the application not accepting all data.
1361 sack_consume(c, framelen + 2);
1367 static void handle_unreliable(struct utcp_connection *c, const struct hdr *hdr, const void *data, size_t len) {
1368 // Fast path for unfragmented packets
1369 if(!hdr->wnd && !(hdr->ctl & MF)) {
1371 c->recv(c, data, len);
1374 c->rcv.nxt = hdr->seq + len;
1378 // Ensure reassembled packet are not larger than 64 kiB
1379 if(hdr->wnd > MAX_UNRELIABLE_SIZE || hdr->wnd + len > MAX_UNRELIABLE_SIZE) {
1383 // Don't accept out of order fragments
1384 if(hdr->wnd && hdr->seq != c->rcv.nxt) {
1388 // Reset the receive buffer for the first fragment
1390 buffer_clear(&c->rcvbuf);
1393 ssize_t rxd = buffer_put_at(&c->rcvbuf, hdr->wnd, data, len);
1395 if(rxd != (ssize_t)len) {
1399 // Send the packet if it's the final fragment
1400 if(!(hdr->ctl & MF)) {
1401 buffer_call(c, &c->rcvbuf, 0, hdr->wnd + len);
1404 c->rcv.nxt = hdr->seq + len;
1407 static void handle_unreliable_framed(struct utcp_connection *c, const struct hdr *hdr, const void *data, size_t len) {
1408 bool in_order = hdr->seq == c->rcv.nxt;
1409 c->rcv.nxt = hdr->seq + len;
1411 const uint8_t *ptr = data;
1414 // Does it start with a partial frame?
1416 // Only accept the data if it is in order
1417 if(in_order && c->rcvbuf.used) {
1418 // In order, append it to the receive buffer
1419 buffer_put(&c->rcvbuf, data, min(hdr->wnd, len));
1421 if(hdr->wnd <= len) {
1422 // We have a full frame
1423 c->recv(c, (uint8_t *)c->rcvbuf.data + 2, c->rcvbuf.used - 2);
1427 // Exit early if there is other data in this frame
1428 if(hdr->wnd > len) {
1430 buffer_clear(&c->rcvbuf);
1440 // We now start with new frames, so clear any data in the receive buffer
1441 buffer_clear(&c->rcvbuf);
1443 // Handle whole frames
1446 memcpy(&framelen, ptr, sizeof(framelen));
1448 if(left <= (size_t)framelen + 2) {
1452 c->recv(c, ptr + 2, framelen);
1453 ptr += framelen + 2;
1454 left -= framelen + 2;
1457 // Handle partial last frame
1459 buffer_put(&c->rcvbuf, ptr, left);
1463 static void handle_incoming_data(struct utcp_connection *c, const struct hdr *hdr, const void *data, size_t len) {
1464 if(!is_reliable(c)) {
1466 handle_unreliable_framed(c, hdr, data, len);
1468 handle_unreliable(c, hdr, data, len);
1474 uint32_t offset = seqdiff(hdr->seq, c->rcv.nxt);
1478 handle_out_of_order_framed(c, offset, data, len);
1480 handle_in_order_framed(c, data, len);
1484 handle_out_of_order(c, offset, data, len);
1486 handle_in_order(c, data, len);
1492 ssize_t utcp_recv(struct utcp *utcp, const void *data, size_t len) {
1493 const uint8_t *ptr = data;
1509 // Drop packets smaller than the header
1513 if(len < sizeof(hdr)) {
1514 print_packet(NULL, "recv", data, len);
1519 // Make a copy from the potentially unaligned data to a struct hdr
1521 memcpy(&hdr, ptr, sizeof(hdr));
1523 // Try to match the packet to an existing connection
1525 struct utcp_connection *c = find_connection(utcp, hdr.dst, hdr.src);
1526 print_packet(c, "recv", data, len);
1528 // Process the header
1533 // Drop packets with an unknown CTL flag
1535 if(hdr.ctl & ~(SYN | ACK | RST | FIN | MF)) {
1536 print_packet(NULL, "recv", data, len);
1541 // Check for auxiliary headers
1543 const uint8_t *init = NULL;
1545 uint16_t aux = hdr.aux;
1548 size_t auxlen = 4 * (aux >> 8) & 0xf;
1549 uint8_t auxtype = aux & 0xff;
1558 if(!(hdr.ctl & SYN) || auxlen != 4) {
1574 if(!(aux & 0x800)) {
1583 memcpy(&aux, ptr, 2);
1588 bool has_data = len || (hdr.ctl & (SYN | FIN));
1590 // Is it for a new connection?
1593 // Ignore RST packets
1599 // Is it a SYN packet and are we LISTENing?
1601 if(hdr.ctl & SYN && !(hdr.ctl & ACK) && utcp->accept) {
1602 // If we don't want to accept it, send a RST back
1603 if((utcp->pre_accept && !utcp->pre_accept(utcp, hdr.dst))) {
1608 // Try to allocate memory, otherwise send a RST back
1609 c = allocate_connection(utcp, hdr.dst, hdr.src);
1616 // Parse auxilliary information
1623 c->flags = init[3] & 0x7;
1625 c->flags = UTCP_TCP;
1629 // Return SYN+ACK, go to SYN_RECEIVED state
1630 c->snd.wnd = hdr.wnd;
1631 c->rcv.irs = hdr.seq;
1632 c->rcv.nxt = c->rcv.irs + 1;
1633 set_state(c, SYN_RECEIVED);
1640 pkt.hdr.src = c->src;
1641 pkt.hdr.dst = c->dst;
1642 pkt.hdr.ack = c->rcv.irs + 1;
1643 pkt.hdr.seq = c->snd.iss;
1644 pkt.hdr.wnd = c->rcvbuf.maxsize;
1645 pkt.hdr.ctl = SYN | ACK;
1648 pkt.hdr.aux = 0x0101;
1652 pkt.data[3] = c->flags & 0x7;
1653 print_packet(c, "send", &pkt, sizeof(hdr) + 4);
1654 utcp->send(utcp, &pkt, sizeof(hdr) + 4);
1657 print_packet(c, "send", &pkt, sizeof(hdr));
1658 utcp->send(utcp, &pkt, sizeof(hdr));
1661 start_retransmit_timer(c);
1663 // No, we don't want your packets, send a RST back
1671 debug(c, "state %s\n", strstate[c->state]);
1673 // In case this is for a CLOSED connection, ignore the packet.
1674 // TODO: make it so incoming packets can never match a CLOSED connection.
1676 if(c->state == CLOSED) {
1677 debug(c, "got packet for closed connection\n");
1681 // It is for an existing connection.
1683 // 1. Drop invalid packets.
1685 // 1a. Drop packets that should not happen in our current state.
1706 // 1b. Discard data that is not in our receive window.
1708 if(is_reliable(c)) {
1711 if(c->state == SYN_SENT) {
1713 } else if(len == 0) {
1714 acceptable = seqdiff(hdr.seq, c->rcv.nxt) >= 0;
1716 int32_t rcv_offset = seqdiff(hdr.seq, c->rcv.nxt);
1718 // cut already accepted front overlapping
1719 if(rcv_offset < 0) {
1720 acceptable = len > (size_t) - rcv_offset;
1725 hdr.seq -= rcv_offset;
1728 acceptable = seqdiff(hdr.seq, c->rcv.nxt) >= 0 && seqdiff(hdr.seq, c->rcv.nxt) + len <= c->rcvbuf.maxsize;
1733 debug(c, "packet not acceptable, %u <= %u + %lu < %u\n", c->rcv.nxt, hdr.seq, (unsigned long)len, c->rcv.nxt + c->rcvbuf.maxsize);
1735 // Ignore unacceptable RST packets.
1740 // Otherwise, continue processing.
1745 int32_t rcv_offset = seqdiff(hdr.seq, c->rcv.nxt);
1748 debug(c, "packet out of order, offset %u bytes\n", rcv_offset);
1754 c->snd.wnd = hdr.wnd; // TODO: move below
1756 // 1c. Drop packets with an invalid ACK.
1757 // ackno should not roll back, and it should also not be bigger than what we ever could have sent
1758 // (= snd.una + c->sndbuf.used).
1760 if(!is_reliable(c)) {
1761 if(hdr.ack != c->snd.last && c->state >= ESTABLISHED) {
1762 hdr.ack = c->snd.una;
1766 if(hdr.ctl & ACK && (seqdiff(hdr.ack, c->snd.last) > 0 || seqdiff(hdr.ack, c->snd.una) < 0)) {
1767 debug(c, "packet ack seqno out of range, %u <= %u < %u\n", c->snd.una, hdr.ack, c->snd.una + c->sndbuf.used);
1769 // Ignore unacceptable RST packets.
1777 // 2. Handle RST packets
1782 if(!(hdr.ctl & ACK)) {
1786 // The peer has refused our connection.
1787 set_state(c, CLOSED);
1788 errno = ECONNREFUSED;
1791 c->recv(c, NULL, 0);
1794 if(c->poll && !c->reapable) {
1805 // We haven't told the application about this connection yet. Silently delete.
1817 // The peer has aborted our connection.
1818 set_state(c, CLOSED);
1822 c->recv(c, NULL, 0);
1825 if(c->poll && !c->reapable) {
1838 // As far as the application is concerned, the connection has already been closed.
1839 // If it has called utcp_close() already, we can immediately free this connection.
1845 // Otherwise, immediately move to the CLOSED state.
1846 set_state(c, CLOSED);
1859 if(!(hdr.ctl & ACK)) {
1864 // 3. Advance snd.una
1866 advanced = seqdiff(hdr.ack, c->snd.una);
1870 if(c->rtt_start.tv_sec) {
1871 if(c->rtt_seq == hdr.ack) {
1872 struct timespec now;
1873 clock_gettime(UTCP_CLOCK, &now);
1874 int32_t diff = timespec_diff_usec(&now, &c->rtt_start);
1875 update_rtt(c, diff);
1876 c->rtt_start.tv_sec = 0;
1877 } else if(c->rtt_seq < hdr.ack) {
1878 debug(c, "cancelling RTT measurement: %u < %u\n", c->rtt_seq, hdr.ack);
1879 c->rtt_start.tv_sec = 0;
1883 int32_t data_acked = advanced;
1891 // TODO: handle FIN as well.
1896 assert(data_acked >= 0);
1899 int32_t bufused = seqdiff(c->snd.last, c->snd.una);
1900 assert(data_acked <= bufused);
1904 buffer_discard(&c->sndbuf, data_acked);
1906 if(is_reliable(c)) {
1911 // Also advance snd.nxt if possible
1912 if(seqdiff(c->snd.nxt, hdr.ack) < 0) {
1913 c->snd.nxt = hdr.ack;
1916 c->snd.una = hdr.ack;
1919 if(c->dupack >= 3) {
1920 debug(c, "fast recovery ended\n");
1921 c->snd.cwnd = c->snd.ssthresh;
1927 // Increase the congestion window according to RFC 5681
1928 if(c->snd.cwnd < c->snd.ssthresh) {
1929 c->snd.cwnd += min(advanced, utcp->mss); // eq. 2
1931 c->snd.cwnd += max(1, (utcp->mss * utcp->mss) / c->snd.cwnd); // eq. 3
1934 if(c->snd.cwnd > c->sndbuf.maxsize) {
1935 c->snd.cwnd = c->sndbuf.maxsize;
1940 // Check if we have sent a FIN that is now ACKed.
1943 if(c->snd.una == c->snd.last) {
1944 set_state(c, FIN_WAIT_2);
1950 if(c->snd.una == c->snd.last) {
1951 clock_gettime(UTCP_CLOCK, &c->conn_timeout);
1952 c->conn_timeout.tv_sec += utcp->timeout;
1953 set_state(c, TIME_WAIT);
1962 if(!len && is_reliable(c) && c->snd.una != c->snd.last) {
1964 debug(c, "duplicate ACK %d\n", c->dupack);
1966 if(c->dupack == 3) {
1967 // RFC 5681 fast recovery
1968 debug(c, "fast recovery started\n", c->dupack);
1969 uint32_t flightsize = seqdiff(c->snd.nxt, c->snd.una);
1970 c->snd.ssthresh = max(flightsize / 2, utcp->mss * 2); // eq. 4
1971 c->snd.cwnd = min(c->snd.ssthresh + 3 * utcp->mss, c->sndbuf.maxsize);
1973 if(c->snd.cwnd > c->sndbuf.maxsize) {
1974 c->snd.cwnd = c->sndbuf.maxsize;
1980 } else if(c->dupack > 3) {
1981 c->snd.cwnd += utcp->mss;
1983 if(c->snd.cwnd > c->sndbuf.maxsize) {
1984 c->snd.cwnd = c->sndbuf.maxsize;
1990 // We got an ACK which indicates the other side did get one of our packets.
1991 // Reset the retransmission timer to avoid going to slow start,
1992 // but don't touch the connection timeout.
1993 start_retransmit_timer(c);
2000 if(c->snd.una == c->snd.last) {
2001 stop_retransmit_timer(c);
2002 timespec_clear(&c->conn_timeout);
2003 } else if(is_reliable(c)) {
2004 start_retransmit_timer(c);
2005 clock_gettime(UTCP_CLOCK, &c->conn_timeout);
2006 c->conn_timeout.tv_sec += utcp->timeout;
2011 // 5. Process SYN stuff
2017 // This is a SYNACK. It should always have ACKed the SYN.
2022 c->rcv.irs = hdr.seq;
2023 c->rcv.nxt = hdr.seq + 1;
2027 set_state(c, FIN_WAIT_1);
2030 set_state(c, ESTABLISHED);
2036 // This is a retransmit of a SYN, send back the SYNACK.
2046 // This could be a retransmission. Ignore the SYN flag, but send an ACK back.
2057 // 6. Process new data
2059 if(c->state == SYN_RECEIVED) {
2060 // This is the ACK after the SYNACK. It should always have ACKed the SYNACK.
2065 // Are we still LISTENing?
2067 utcp->accept(c, c->src);
2070 if(c->state != ESTABLISHED) {
2071 set_state(c, CLOSED);
2081 // This should never happen.
2096 // Ehm no, We should never receive more data after a FIN.
2106 handle_incoming_data(c, &hdr, ptr, len);
2109 // 7. Process FIN stuff
2111 if((hdr.ctl & FIN) && (!is_reliable(c) || hdr.seq + len == c->rcv.nxt)) {
2115 // This should never happen.
2122 set_state(c, CLOSE_WAIT);
2126 set_state(c, CLOSING);
2130 clock_gettime(UTCP_CLOCK, &c->conn_timeout);
2131 c->conn_timeout.tv_sec += utcp->timeout;
2132 set_state(c, TIME_WAIT);
2139 // Ehm, no. We should never receive a second FIN.
2149 // FIN counts as one sequence number
2153 // Inform the application that the peer closed its end of the connection.
2156 c->recv(c, NULL, 0);
2160 // Now we send something back if:
2161 // - we received data, so we have to send back an ACK
2162 // -> sendatleastone = true
2163 // - or we got an ack, so we should maybe send a bit more data
2164 // -> sendatleastone = false
2166 if(is_reliable(c) || hdr.ctl & SYN || hdr.ctl & FIN) {
2181 hdr.ack = hdr.seq + len;
2183 hdr.ctl = RST | ACK;
2186 print_packet(c, "send", &hdr, sizeof(hdr));
2187 utcp->send(utcp, &hdr, sizeof(hdr));
2192 int utcp_shutdown(struct utcp_connection *c, int dir) {
2193 debug(c, "shutdown %d at %u\n", dir, c ? c->snd.last : 0);
2201 debug(c, "shutdown() called on closed connection\n");
2206 if(!(dir == UTCP_SHUT_RD || dir == UTCP_SHUT_WR || dir == UTCP_SHUT_RDWR)) {
2211 // TCP does not have a provision for stopping incoming packets.
2212 // The best we can do is to just ignore them.
2213 if(dir == UTCP_SHUT_RD || dir == UTCP_SHUT_RDWR) {
2217 // The rest of the code deals with shutting down writes.
2218 if(dir == UTCP_SHUT_RD) {
2222 // Only process shutting down writes once.
2240 if(!is_reliable(c) && is_framed(c)) {
2241 flush_unreliable_framed(c);
2244 set_state(c, FIN_WAIT_1);
2252 set_state(c, CLOSING);
2263 ack(c, !is_reliable(c));
2265 if(!timespec_isset(&c->rtrx_timeout)) {
2266 start_retransmit_timer(c);
2272 static bool reset_connection(struct utcp_connection *c) {
2279 debug(c, "abort() called on closed connection\n");
2296 set_state(c, CLOSED);
2304 set_state(c, CLOSED);
2314 hdr.seq = c->snd.nxt;
2319 print_packet(c, "send", &hdr, sizeof(hdr));
2320 c->utcp->send(c->utcp, &hdr, sizeof(hdr));
2324 // Closes all the opened connections
2325 void utcp_abort_all_connections(struct utcp *utcp) {
2331 for(int i = 0; i < utcp->nconnections; i++) {
2332 struct utcp_connection *c = utcp->connections[i];
2334 if(c->reapable || c->state == CLOSED) {
2338 utcp_recv_t old_recv = c->recv;
2339 utcp_poll_t old_poll = c->poll;
2341 reset_connection(c);
2345 old_recv(c, NULL, 0);
2348 if(old_poll && !c->reapable) {
2357 int utcp_close(struct utcp_connection *c) {
2358 debug(c, "closing\n");
2360 if(c->rcvbuf.used) {
2361 debug(c, "receive buffer not empty, resetting\n");
2362 return reset_connection(c) ? 0 : -1;
2365 if(utcp_shutdown(c, SHUT_RDWR) && errno != ENOTCONN) {
2375 int utcp_abort(struct utcp_connection *c) {
2376 if(!reset_connection(c)) {
2385 * One call to this function will loop through all connections,
2386 * checking if something needs to be resent or not.
2387 * The return value is the time to the next timeout in milliseconds,
2388 * or maybe a negative value if the timeout is infinite.
2390 struct timespec utcp_timeout(struct utcp *utcp) {
2391 struct timespec now;
2392 clock_gettime(UTCP_CLOCK, &now);
2393 struct timespec next = {now.tv_sec + 3600, now.tv_nsec};
2395 for(int i = 0; i < utcp->nconnections; i++) {
2396 struct utcp_connection *c = utcp->connections[i];
2402 // delete connections that have been utcp_close()d.
2403 if(c->state == CLOSED) {
2405 debug(c, "reaping\n");
2413 if(timespec_isset(&c->conn_timeout) && timespec_lt(&c->conn_timeout, &now)) {
2418 c->recv(c, NULL, 0);
2421 if(c->poll && !c->reapable) {
2428 if(timespec_isset(&c->rtrx_timeout) && timespec_lt(&c->rtrx_timeout, &now)) {
2429 debug(c, "retransmitting after timeout\n");
2434 if((c->state == ESTABLISHED || c->state == CLOSE_WAIT) && c->do_poll) {
2436 uint32_t len = is_framed(c) ? min(buffer_free(&c->sndbuf), MAX_UNRELIABLE_SIZE) : buffer_free(&c->sndbuf);
2441 } else if(c->state == CLOSED) {
2446 if(timespec_isset(&c->conn_timeout) && timespec_lt(&c->conn_timeout, &next)) {
2447 next = c->conn_timeout;
2450 if(timespec_isset(&c->rtrx_timeout) && timespec_lt(&c->rtrx_timeout, &next)) {
2451 next = c->rtrx_timeout;
2455 struct timespec diff;
2457 timespec_sub(&next, &now, &diff);
2462 bool utcp_is_active(struct utcp *utcp) {
2467 for(int i = 0; i < utcp->nconnections; i++)
2468 if(utcp->connections[i]->state != CLOSED && utcp->connections[i]->state != TIME_WAIT) {
2475 struct utcp *utcp_init(utcp_accept_t accept, utcp_pre_accept_t pre_accept, utcp_send_t send, void *priv) {
2481 struct utcp *utcp = calloc(1, sizeof(*utcp));
2487 utcp_set_mtu(utcp, DEFAULT_MTU);
2494 if(!CLOCK_GRANULARITY) {
2495 struct timespec res;
2496 clock_getres(UTCP_CLOCK, &res);
2497 CLOCK_GRANULARITY = res.tv_sec * USEC_PER_SEC + res.tv_nsec / 1000;
2500 utcp->accept = accept;
2501 utcp->pre_accept = pre_accept;
2504 utcp->timeout = DEFAULT_USER_TIMEOUT; // sec
2509 void utcp_exit(struct utcp *utcp) {
2514 for(int i = 0; i < utcp->nconnections; i++) {
2515 struct utcp_connection *c = utcp->connections[i];
2519 c->recv(c, NULL, 0);
2522 if(c->poll && !c->reapable) {
2527 buffer_exit(&c->rcvbuf);
2528 buffer_exit(&c->sndbuf);
2532 free(utcp->connections);
2537 uint16_t utcp_get_mtu(struct utcp *utcp) {
2538 return utcp ? utcp->mtu : 0;
2541 uint16_t utcp_get_mss(struct utcp *utcp) {
2542 return utcp ? utcp->mss : 0;
2545 void utcp_set_mtu(struct utcp *utcp, uint16_t mtu) {
2550 if(mtu <= sizeof(struct hdr)) {
2554 if(mtu > utcp->mtu) {
2555 char *new = realloc(utcp->pkt, mtu + sizeof(struct hdr));
2565 utcp->mss = mtu - sizeof(struct hdr);
2568 void utcp_reset_timers(struct utcp *utcp) {
2573 struct timespec now, then;
2575 clock_gettime(UTCP_CLOCK, &now);
2579 then.tv_sec += utcp->timeout;
2581 for(int i = 0; i < utcp->nconnections; i++) {
2582 struct utcp_connection *c = utcp->connections[i];
2588 if(timespec_isset(&c->rtrx_timeout)) {
2589 c->rtrx_timeout = now;
2592 if(timespec_isset(&c->conn_timeout)) {
2593 c->conn_timeout = then;
2596 c->rtt_start.tv_sec = 0;
2598 if(c->rto > START_RTO) {
2604 int utcp_get_user_timeout(struct utcp *u) {
2605 return u ? u->timeout : 0;
2608 void utcp_set_user_timeout(struct utcp *u, int timeout) {
2610 u->timeout = timeout;
2614 size_t utcp_get_sndbuf(struct utcp_connection *c) {
2615 return c ? c->sndbuf.maxsize : 0;
2618 size_t utcp_get_sndbuf_free(struct utcp_connection *c) {
2628 return buffer_free(&c->sndbuf);
2635 void utcp_set_sndbuf(struct utcp_connection *c, size_t size) {
2640 c->sndbuf.maxsize = size;
2642 if(c->sndbuf.maxsize != size) {
2643 c->sndbuf.maxsize = -1;
2646 c->do_poll = is_reliable(c) && buffer_free(&c->sndbuf);
2649 size_t utcp_get_rcvbuf(struct utcp_connection *c) {
2650 return c ? c->rcvbuf.maxsize : 0;
2653 size_t utcp_get_rcvbuf_free(struct utcp_connection *c) {
2654 if(c && (c->state == ESTABLISHED || c->state == CLOSE_WAIT)) {
2655 return buffer_free(&c->rcvbuf);
2661 void utcp_set_rcvbuf(struct utcp_connection *c, size_t size) {
2666 c->rcvbuf.maxsize = size;
2668 if(c->rcvbuf.maxsize != size) {
2669 c->rcvbuf.maxsize = -1;
2673 size_t utcp_get_sendq(struct utcp_connection *c) {
2674 return c->sndbuf.used;
2677 size_t utcp_get_recvq(struct utcp_connection *c) {
2678 return c->rcvbuf.used;
2681 bool utcp_get_nodelay(struct utcp_connection *c) {
2682 return c ? c->nodelay : false;
2685 void utcp_set_nodelay(struct utcp_connection *c, bool nodelay) {
2687 c->nodelay = nodelay;
2691 bool utcp_get_keepalive(struct utcp_connection *c) {
2692 return c ? c->keepalive : false;
2695 void utcp_set_keepalive(struct utcp_connection *c, bool keepalive) {
2697 c->keepalive = keepalive;
2701 size_t utcp_get_outq(struct utcp_connection *c) {
2702 return c ? seqdiff(c->snd.nxt, c->snd.una) : 0;
2705 void utcp_set_recv_cb(struct utcp_connection *c, utcp_recv_t recv) {
2711 void utcp_set_poll_cb(struct utcp_connection *c, utcp_poll_t poll) {
2714 c->do_poll = is_reliable(c) && buffer_free(&c->sndbuf);
2718 void utcp_set_accept_cb(struct utcp *utcp, utcp_accept_t accept, utcp_pre_accept_t pre_accept) {
2720 utcp->accept = accept;
2721 utcp->pre_accept = pre_accept;
2725 void utcp_expect_data(struct utcp_connection *c, bool expect) {
2726 if(!c || c->reapable) {
2730 if(!(c->state == ESTABLISHED || c->state == FIN_WAIT_1 || c->state == FIN_WAIT_2)) {
2735 // If we expect data, start the connection timer.
2736 if(!timespec_isset(&c->conn_timeout)) {
2737 clock_gettime(UTCP_CLOCK, &c->conn_timeout);
2738 c->conn_timeout.tv_sec += c->utcp->timeout;
2741 // If we want to cancel expecting data, only clear the timer when there is no unACKed data.
2742 if(c->snd.una == c->snd.last) {
2743 timespec_clear(&c->conn_timeout);
2748 void utcp_offline(struct utcp *utcp, bool offline) {
2749 struct timespec now;
2750 clock_gettime(UTCP_CLOCK, &now);
2752 for(int i = 0; i < utcp->nconnections; i++) {
2753 struct utcp_connection *c = utcp->connections[i];
2759 utcp_expect_data(c, offline);
2762 if(timespec_isset(&c->rtrx_timeout)) {
2763 c->rtrx_timeout = now;
2766 utcp->connections[i]->rtt_start.tv_sec = 0;
2768 if(c->rto > START_RTO) {
2775 void utcp_set_retransmit_cb(struct utcp *utcp, utcp_retransmit_t cb) {
2776 utcp->retransmit = cb;
2779 void utcp_set_clock_granularity(long granularity) {
2780 CLOCK_GRANULARITY = granularity;
2783 int utcp_get_flush_timeout(struct utcp *utcp) {
2784 return utcp->flush_timeout;
2787 void utcp_set_flush_timeout(struct utcp *utcp, int milliseconds) {
2788 utcp->flush_timeout = milliseconds;