2 utcp.c -- Userspace TCP
3 Copyright (C) 2014-2017 Guus Sliepen <guus@tinc-vpn.org>
5 This program is free software; you can redistribute it and/or modify
6 it under the terms of the GNU General Public License as published by
7 the Free Software Foundation; either version 2 of the License, or
8 (at your option) any later version.
10 This program is distributed in the hope that it will be useful,
11 but WITHOUT ANY WARRANTY; without even the implied warranty of
12 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13 GNU General Public License for more details.
15 You should have received a copy of the GNU General Public License along
16 with this program; if not, write to the Free Software Foundation, Inc.,
17 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
32 #include "utcp_priv.h"
47 #if defined(CLOCK_MONOTONIC_RAW) && defined(__x86_64__)
48 #define UTCP_CLOCK CLOCK_MONOTONIC_RAW
50 #define UTCP_CLOCK CLOCK_MONOTONIC
54 static void timespec_sub(const struct timespec *a, const struct timespec *b, struct timespec *r) {
55 r->tv_sec = a->tv_sec - b->tv_sec;
56 r->tv_nsec = a->tv_nsec - b->tv_nsec;
59 r->tv_sec--, r->tv_nsec += NSEC_PER_SEC;
63 static int32_t timespec_diff_usec(const struct timespec *a, const struct timespec *b) {
64 return (a->tv_sec - b->tv_sec) * 1000000 + (a->tv_nsec - b->tv_nsec) / 1000;
67 static bool timespec_lt(const struct timespec *a, const struct timespec *b) {
68 if(a->tv_sec == b->tv_sec) {
69 return a->tv_nsec < b->tv_nsec;
71 return a->tv_sec < b->tv_sec;
75 static void timespec_clear(struct timespec *a) {
80 static bool timespec_isset(const struct timespec *a) {
84 static long CLOCK_GRANULARITY; // usec
86 static inline size_t min(size_t a, size_t b) {
90 static inline size_t max(size_t a, size_t b) {
97 #ifndef UTCP_DEBUG_DATALEN
98 #define UTCP_DEBUG_DATALEN 20
101 static void debug(struct utcp_connection *c, const char *format, ...) {
106 clock_gettime(CLOCK_REALTIME, &tv);
107 len = snprintf(buf, sizeof(buf), "%ld.%06lu %u:%u ", (long)tv.tv_sec, tv.tv_nsec / 1000, c ? c->src : 0, c ? c->dst : 0);
109 va_start(ap, format);
110 len += vsnprintf(buf + len, sizeof(buf) - len, format, ap);
113 if(len > 0 && (size_t)len < sizeof(buf)) {
114 fwrite(buf, len, 1, stderr);
118 static void print_packet(struct utcp_connection *c, const char *dir, const void *pkt, size_t len) {
121 if(len < sizeof(hdr)) {
122 debug(c, "%s: short packet (%lu bytes)\n", dir, (unsigned long)len);
126 memcpy(&hdr, pkt, sizeof(hdr));
130 if(len > sizeof(hdr)) {
131 datalen = min(len - sizeof(hdr), UTCP_DEBUG_DATALEN);
137 const uint8_t *data = (uint8_t *)pkt + sizeof(hdr);
138 char str[datalen * 2 + 1];
141 for(uint32_t i = 0; i < datalen; i++) {
142 *p++ = "0123456789ABCDEF"[data[i] >> 4];
143 *p++ = "0123456789ABCDEF"[data[i] & 15];
148 debug(c, "%s: len %lu src %u dst %u seq %u ack %u wnd %u aux %x ctl %s%s%s%s%s data %s\n",
149 dir, (unsigned long)len, hdr.src, hdr.dst, hdr.seq, hdr.ack, hdr.wnd, hdr.aux,
150 hdr.ctl & SYN ? "SYN" : "",
151 hdr.ctl & RST ? "RST" : "",
152 hdr.ctl & FIN ? "FIN" : "",
153 hdr.ctl & ACK ? "ACK" : "",
154 hdr.ctl & MF ? "MF" : "",
159 static void debug_cwnd(struct utcp_connection *c) {
160 debug(c, "snd.cwnd %u snd.ssthresh %u\n", c->snd.cwnd, ~c->snd.ssthresh ? c->snd.ssthresh : 0);
163 #define debug(...) do {} while(0)
164 #define print_packet(...) do {} while(0)
165 #define debug_cwnd(...) do {} while(0)
168 static void set_state(struct utcp_connection *c, enum state state) {
171 if(state == ESTABLISHED) {
172 timespec_clear(&c->conn_timeout);
175 debug(c, "state %s\n", strstate[state]);
178 static bool fin_wanted(struct utcp_connection *c, uint32_t seq) {
179 if(seq != c->snd.last) {
194 static bool is_reliable(struct utcp_connection *c) {
195 return c->flags & UTCP_RELIABLE;
198 static bool is_framed(struct utcp_connection *c) {
199 return c->flags & UTCP_FRAMED;
202 static int32_t seqdiff(uint32_t a, uint32_t b) {
207 static bool buffer_wraps(struct buffer *buf) {
208 return buf->size - buf->offset < buf->used;
211 static bool buffer_resize(struct buffer *buf, uint32_t newsize) {
212 char *newdata = realloc(buf->data, newsize);
220 if(buffer_wraps(buf)) {
221 // Shift the right part of the buffer until it hits the end of the new buffer.
225 // [345.........|........012]
226 uint32_t tailsize = buf->size - buf->offset;
227 uint32_t newoffset = newsize - tailsize;
228 memmove(buf->data + newoffset, buf->data + buf->offset, tailsize);
229 buf->offset = newoffset;
236 // Store data into the buffer
237 static ssize_t buffer_put_at(struct buffer *buf, size_t offset, const void *data, size_t len) {
238 debug(NULL, "buffer_put_at %lu %lu %lu\n", (unsigned long)buf->used, (unsigned long)offset, (unsigned long)len);
240 // Ensure we don't store more than maxsize bytes in total
241 size_t required = offset + len;
243 if(required > buf->maxsize) {
244 if(offset >= buf->maxsize) {
248 len = buf->maxsize - offset;
249 required = buf->maxsize;
252 // Check if we need to resize the buffer
253 if(required > buf->size) {
254 size_t newsize = buf->size;
262 } while(newsize < required);
264 if(newsize > buf->maxsize) {
265 newsize = buf->maxsize;
268 if(!buffer_resize(buf, newsize)) {
273 uint32_t realoffset = buf->offset + offset;
275 if(buf->size - buf->offset <= offset) {
276 // The offset wrapped
277 realoffset -= buf->size;
280 if(buf->size - realoffset < len) {
281 // The new chunk of data must be wrapped
282 memcpy(buf->data + realoffset, data, buf->size - realoffset);
283 memcpy(buf->data, (char *)data + buf->size - realoffset, len - (buf->size - realoffset));
285 memcpy(buf->data + realoffset, data, len);
288 if(required > buf->used) {
289 buf->used = required;
295 static ssize_t buffer_put(struct buffer *buf, const void *data, size_t len) {
296 return buffer_put_at(buf, buf->used, data, len);
299 // Copy data from the buffer without removing it.
300 static ssize_t buffer_copy(struct buffer *buf, void *data, size_t offset, size_t len) {
301 // Ensure we don't copy more than is actually stored in the buffer
302 if(offset >= buf->used) {
306 if(buf->used - offset < len) {
307 len = buf->used - offset;
310 uint32_t realoffset = buf->offset + offset;
312 if(buf->size - buf->offset <= offset) {
313 // The offset wrapped
314 realoffset -= buf->size;
317 if(buf->size - realoffset < len) {
318 // The data is wrapped
319 memcpy(data, buf->data + realoffset, buf->size - realoffset);
320 memcpy((char *)data + buf->size - realoffset, buf->data, len - (buf->size - realoffset));
322 memcpy(data, buf->data + realoffset, len);
328 // Copy data from the buffer without removing it.
329 static ssize_t buffer_call(struct utcp_connection *c, struct buffer *buf, size_t offset, size_t len) {
334 // Ensure we don't copy more than is actually stored in the buffer
335 if(offset >= buf->used) {
339 if(buf->used - offset < len) {
340 len = buf->used - offset;
343 uint32_t realoffset = buf->offset + offset;
345 if(buf->size - buf->offset <= offset) {
346 // The offset wrapped
347 realoffset -= buf->size;
350 if(buf->size - realoffset < len) {
351 // The data is wrapped
352 ssize_t rx1 = c->recv(c, buf->data + realoffset, buf->size - realoffset);
354 if(rx1 < buf->size - realoffset) {
358 // The channel might have been closed by the previous callback
363 ssize_t rx2 = c->recv(c, buf->data, len - (buf->size - realoffset));
371 return c->recv(c, buf->data + realoffset, len);
375 // Discard data from the buffer.
376 static ssize_t buffer_discard(struct buffer *buf, size_t len) {
377 if(buf->used < len) {
381 if(buf->size - buf->offset <= len) {
382 buf->offset -= buf->size;
385 if(buf->used == len) {
396 static void buffer_clear(struct buffer *buf) {
401 static bool buffer_set_size(struct buffer *buf, uint32_t minsize, uint32_t maxsize) {
402 if(maxsize < minsize) {
406 buf->maxsize = maxsize;
408 return buf->size >= minsize || buffer_resize(buf, minsize);
411 static void buffer_exit(struct buffer *buf) {
413 memset(buf, 0, sizeof(*buf));
416 static uint32_t buffer_free(const struct buffer *buf) {
417 return buf->maxsize > buf->used ? buf->maxsize - buf->used : 0;
420 // Connections are stored in a sorted list.
421 // This gives O(log(N)) lookup time, O(N log(N)) insertion time and O(N) deletion time.
423 static int compare(const void *va, const void *vb) {
426 const struct utcp_connection *a = *(struct utcp_connection **)va;
427 const struct utcp_connection *b = *(struct utcp_connection **)vb;
430 assert(a->src && b->src);
432 int c = (int)a->src - (int)b->src;
438 c = (int)a->dst - (int)b->dst;
442 static struct utcp_connection *find_connection(const struct utcp *utcp, uint16_t src, uint16_t dst) {
443 if(!utcp->nconnections) {
447 struct utcp_connection key = {
451 struct utcp_connection **match = bsearch(&keyp, utcp->connections, utcp->nconnections, sizeof(*utcp->connections), compare);
452 return match ? *match : NULL;
455 static void free_connection(struct utcp_connection *c) {
456 struct utcp *utcp = c->utcp;
457 struct utcp_connection **cp = bsearch(&c, utcp->connections, utcp->nconnections, sizeof(*utcp->connections), compare);
461 int i = cp - utcp->connections;
462 memmove(cp, cp + 1, (utcp->nconnections - i - 1) * sizeof(*cp));
463 utcp->nconnections--;
465 buffer_exit(&c->rcvbuf);
466 buffer_exit(&c->sndbuf);
470 static struct utcp_connection *allocate_connection(struct utcp *utcp, uint16_t src, uint16_t dst) {
471 // Check whether this combination of src and dst is free
474 if(find_connection(utcp, src, dst)) {
478 } else { // If src == 0, generate a random port number with the high bit set
479 if(utcp->nconnections >= 32767) {
484 src = rand() | 0x8000;
486 while(find_connection(utcp, src, dst)) {
491 // Allocate memory for the new connection
493 if(utcp->nconnections >= utcp->nallocated) {
494 if(!utcp->nallocated) {
495 utcp->nallocated = 4;
497 utcp->nallocated *= 2;
500 struct utcp_connection **new_array = realloc(utcp->connections, utcp->nallocated * sizeof(*utcp->connections));
506 utcp->connections = new_array;
509 struct utcp_connection *c = calloc(1, sizeof(*c));
515 if(!buffer_set_size(&c->sndbuf, DEFAULT_SNDBUFSIZE, DEFAULT_MAXSNDBUFSIZE)) {
520 if(!buffer_set_size(&c->rcvbuf, DEFAULT_RCVBUFSIZE, DEFAULT_MAXRCVBUFSIZE)) {
521 buffer_exit(&c->sndbuf);
526 // Fill in the details
535 c->snd.una = c->snd.iss;
536 c->snd.nxt = c->snd.iss + 1;
537 c->snd.last = c->snd.nxt;
538 c->snd.cwnd = (utcp->mss > 2190 ? 2 : utcp->mss > 1095 ? 3 : 4) * utcp->mss;
539 c->snd.ssthresh = ~0;
546 // Add it to the sorted list of connections
548 utcp->connections[utcp->nconnections++] = c;
549 qsort(utcp->connections, utcp->nconnections, sizeof(*utcp->connections), compare);
554 static inline uint32_t absdiff(uint32_t a, uint32_t b) {
562 // Update RTT variables. See RFC 6298.
563 static void update_rtt(struct utcp_connection *c, uint32_t rtt) {
565 debug(c, "invalid rtt\n");
573 c->rttvar = (c->rttvar * 3 + absdiff(c->srtt, rtt)) / 4;
574 c->srtt = (c->srtt * 7 + rtt) / 8;
577 c->rto = c->srtt + max(4 * c->rttvar, CLOCK_GRANULARITY);
579 if(c->rto > MAX_RTO) {
583 debug(c, "rtt %u srtt %u rttvar %u rto %u\n", rtt, c->srtt, c->rttvar, c->rto);
586 static void start_retransmit_timer(struct utcp_connection *c) {
587 clock_gettime(UTCP_CLOCK, &c->rtrx_timeout);
589 uint32_t rto = c->rto;
591 while(rto > USEC_PER_SEC) {
592 c->rtrx_timeout.tv_sec++;
596 c->rtrx_timeout.tv_nsec += rto * 1000;
598 if(c->rtrx_timeout.tv_nsec >= NSEC_PER_SEC) {
599 c->rtrx_timeout.tv_nsec -= NSEC_PER_SEC;
600 c->rtrx_timeout.tv_sec++;
603 debug(c, "rtrx_timeout %ld.%06lu\n", c->rtrx_timeout.tv_sec, c->rtrx_timeout.tv_nsec);
606 static void start_flush_timer(struct utcp_connection *c) {
607 clock_gettime(UTCP_CLOCK, &c->rtrx_timeout);
609 c->rtrx_timeout.tv_nsec += c->utcp->flush_timeout * 1000000;
611 if(c->rtrx_timeout.tv_nsec >= NSEC_PER_SEC) {
612 c->rtrx_timeout.tv_nsec -= NSEC_PER_SEC;
613 c->rtrx_timeout.tv_sec++;
616 debug(c, "rtrx_timeout %ld.%06lu (flush)\n", c->rtrx_timeout.tv_sec, c->rtrx_timeout.tv_nsec);
619 static void stop_retransmit_timer(struct utcp_connection *c) {
620 timespec_clear(&c->rtrx_timeout);
621 debug(c, "rtrx_timeout cleared\n");
624 struct utcp_connection *utcp_connect_ex(struct utcp *utcp, uint16_t dst, utcp_recv_t recv, void *priv, uint32_t flags) {
625 struct utcp_connection *c = allocate_connection(utcp, 0, dst);
631 assert((flags & ~0x1f) == 0);
642 pkt.hdr.src = c->src;
643 pkt.hdr.dst = c->dst;
644 pkt.hdr.seq = c->snd.iss;
646 pkt.hdr.wnd = c->rcvbuf.maxsize;
648 pkt.hdr.aux = 0x0101;
652 pkt.init[3] = flags & 0x7;
654 set_state(c, SYN_SENT);
656 print_packet(c, "send", &pkt, sizeof(pkt));
657 utcp->send(utcp, &pkt, sizeof(pkt));
659 clock_gettime(UTCP_CLOCK, &c->conn_timeout);
660 c->conn_timeout.tv_sec += utcp->timeout;
662 start_retransmit_timer(c);
667 struct utcp_connection *utcp_connect(struct utcp *utcp, uint16_t dst, utcp_recv_t recv, void *priv) {
668 return utcp_connect_ex(utcp, dst, recv, priv, UTCP_TCP);
671 void utcp_accept(struct utcp_connection *c, utcp_recv_t recv, void *priv) {
672 if(c->reapable || c->state != SYN_RECEIVED) {
673 debug(c, "accept() called on invalid connection in state %s\n", c, strstate[c->state]);
677 debug(c, "accepted %p %p\n", c, recv, priv);
680 set_state(c, ESTABLISHED);
683 static void ack(struct utcp_connection *c, bool sendatleastone) {
684 int32_t left = seqdiff(c->snd.last, c->snd.nxt);
685 int32_t cwndleft = is_reliable(c) ? min(c->snd.cwnd, c->snd.wnd) - seqdiff(c->snd.nxt, c->snd.una) : MAX_UNRELIABLE_SIZE;
691 } else if(cwndleft < left) {
694 if(!sendatleastone || cwndleft > c->utcp->mss) {
695 left -= left % c->utcp->mss;
699 debug(c, "cwndleft %d left %d\n", cwndleft, left);
701 if(!left && !sendatleastone) {
708 } *pkt = c->utcp->pkt;
710 pkt->hdr.src = c->src;
711 pkt->hdr.dst = c->dst;
712 pkt->hdr.ack = c->rcv.nxt;
713 pkt->hdr.wnd = is_reliable(c) ? c->rcvbuf.maxsize : 0;
718 uint32_t seglen = left > c->utcp->mss ? c->utcp->mss : left;
719 pkt->hdr.seq = c->snd.nxt;
721 buffer_copy(&c->sndbuf, pkt->data, seqdiff(c->snd.nxt, c->snd.una), seglen);
723 c->snd.nxt += seglen;
726 if(!is_reliable(c)) {
734 if(seglen && fin_wanted(c, c->snd.nxt)) {
739 if(!c->rtt_start.tv_sec && is_reliable(c)) {
740 // Start RTT measurement
741 clock_gettime(UTCP_CLOCK, &c->rtt_start);
742 c->rtt_seq = pkt->hdr.seq + seglen;
743 debug(c, "starting RTT measurement, expecting ack %u\n", c->rtt_seq);
746 print_packet(c, "send", pkt, sizeof(pkt->hdr) + seglen);
747 c->utcp->send(c->utcp, pkt, sizeof(pkt->hdr) + seglen);
749 if(left && !is_reliable(c)) {
750 pkt->hdr.wnd += seglen;
755 static ssize_t utcp_send_reliable(struct utcp_connection *c, const void *data, size_t len) {
756 size_t rlen = len + (is_framed(c) ? 2 : 0);
762 // Check if we need to be able to buffer all data
764 if(c->flags & (UTCP_NO_PARTIAL | UTCP_FRAMED)) {
765 if(rlen > c->sndbuf.maxsize) {
770 if((c->flags & UTCP_FRAMED) && len > MAX_UNRELIABLE_SIZE) {
775 if(rlen > buffer_free(&c->sndbuf)) {
781 // Add data to the send buffer.
784 uint16_t len16 = len;
785 buffer_put(&c->sndbuf, &len16, sizeof(len16));
786 assert(buffer_put(&c->sndbuf, data, len) == (ssize_t)len);
788 len = buffer_put(&c->sndbuf, data, len);
798 // Don't send anything yet if the connection has not fully established yet
800 if(c->state == SYN_SENT || c->state == SYN_RECEIVED) {
806 if(!timespec_isset(&c->rtrx_timeout)) {
807 start_retransmit_timer(c);
810 if(!timespec_isset(&c->conn_timeout)) {
811 clock_gettime(UTCP_CLOCK, &c->conn_timeout);
812 c->conn_timeout.tv_sec += c->utcp->timeout;
819 /* In the send buffer we can have multiple frames, each prefixed with their
820 length. However, the first frame might already have been partially sent. The
821 variable c->frame_offset tracks how much of a partial frame is left at the
822 start. If it is 0, it means there is no partial frame, and the first two
823 bytes in the send buffer are the length of the first frame.
825 After sending an MSS sized packet, we need to calculate the new frame_offset
826 value, since it is likely that the next packet will also have a partial frame
827 at the start. We do this by scanning the previously sent packet for frame
828 headers, to find out how many bytes of the last frame are left to send.
830 static void ack_unreliable_framed(struct utcp_connection *c) {
831 int32_t left = seqdiff(c->snd.last, c->snd.nxt);
837 } *pkt = c->utcp->pkt;
839 pkt->hdr.src = c->src;
840 pkt->hdr.dst = c->dst;
841 pkt->hdr.ack = c->rcv.nxt;
842 pkt->hdr.ctl = ACK | MF;
845 bool sent_packet = false;
847 while(left >= c->utcp->mss) {
848 pkt->hdr.wnd = c->frame_offset;
849 uint32_t seglen = c->utcp->mss;
851 pkt->hdr.seq = c->snd.nxt;
853 buffer_copy(&c->sndbuf, pkt->data, seqdiff(c->snd.nxt, c->snd.una), seglen);
855 c->snd.nxt += seglen;
856 c->snd.una = c->snd.nxt;
859 print_packet(c, "send", pkt, sizeof(pkt->hdr) + seglen);
860 c->utcp->send(c->utcp, pkt, sizeof(pkt->hdr) + seglen);
863 // Calculate the new frame offset
864 while(c->frame_offset < seglen) {
866 buffer_copy(&c->sndbuf, &framelen, c->frame_offset, sizeof(framelen));
867 c->frame_offset += framelen + 2;
870 buffer_discard(&c->sndbuf, seglen);
871 c->frame_offset -= seglen;
876 // We sent one packet but we have partial data left, (re)start the flush timer
877 start_flush_timer(c);
879 // There is no partial data in the send buffer, so stop the flush timer
880 stop_retransmit_timer(c);
885 static void flush_unreliable_framed(struct utcp_connection *c) {
886 int32_t left = seqdiff(c->snd.last, c->snd.nxt);
888 /* If the MSS dropped since last time ack_unreliable_frame() was called,
889 we might now have more than one segment worth of data left.
891 if(left > c->utcp->mss) {
892 ack_unreliable_framed(c);
893 left = seqdiff(c->snd.last, c->snd.nxt);
894 assert(left <= c->utcp->mss);
901 } *pkt = c->utcp->pkt;
903 pkt->hdr.src = c->src;
904 pkt->hdr.dst = c->dst;
905 pkt->hdr.seq = c->snd.nxt;
906 pkt->hdr.ack = c->rcv.nxt;
907 pkt->hdr.wnd = c->frame_offset;
908 pkt->hdr.ctl = ACK | MF;
911 uint32_t seglen = left;
913 buffer_copy(&c->sndbuf, pkt->data, seqdiff(c->snd.nxt, c->snd.una), seglen);
914 buffer_discard(&c->sndbuf, seglen);
916 c->snd.nxt += seglen;
917 c->snd.una = c->snd.nxt;
919 print_packet(c, "send", pkt, sizeof(pkt->hdr) + seglen);
920 c->utcp->send(c->utcp, pkt, sizeof(pkt->hdr) + seglen);
924 stop_retransmit_timer(c);
928 static ssize_t utcp_send_unreliable(struct utcp_connection *c, const void *data, size_t len) {
929 if(len > MAX_UNRELIABLE_SIZE) {
934 size_t rlen = len + (is_framed(c) ? 2 : 0);
936 if(rlen > buffer_free(&c->sndbuf)) {
937 if(rlen > c->sndbuf.maxsize) {
946 // Don't send anything yet if the connection has not fully established yet
948 if(c->state == SYN_SENT || c->state == SYN_RECEIVED) {
953 uint16_t framelen = len;
954 buffer_put(&c->sndbuf, &framelen, sizeof(framelen));
957 buffer_put(&c->sndbuf, data, len);
962 ack_unreliable_framed(c);
965 c->snd.una = c->snd.nxt = c->snd.last;
966 buffer_discard(&c->sndbuf, c->sndbuf.used);
972 ssize_t utcp_send(struct utcp_connection *c, const void *data, size_t len) {
974 debug(c, "send() called on closed connection\n");
982 debug(c, "send() called on unconnected connection\n");
997 debug(c, "send() called on closed connection\n");
1007 if(is_reliable(c)) {
1008 return utcp_send_reliable(c, data, len);
1010 return utcp_send_unreliable(c, data, len);
1014 static void swap_ports(struct hdr *hdr) {
1015 uint16_t tmp = hdr->src;
1016 hdr->src = hdr->dst;
1020 static void fast_retransmit(struct utcp_connection *c) {
1021 if(c->state == CLOSED || c->snd.last == c->snd.una) {
1022 debug(c, "fast_retransmit() called but nothing to retransmit!\n");
1026 struct utcp *utcp = c->utcp;
1031 } *pkt = c->utcp->pkt;
1033 pkt->hdr.src = c->src;
1034 pkt->hdr.dst = c->dst;
1035 pkt->hdr.wnd = c->rcvbuf.maxsize;
1044 // Send unacked data again.
1045 pkt->hdr.seq = c->snd.una;
1046 pkt->hdr.ack = c->rcv.nxt;
1048 uint32_t len = min(seqdiff(c->snd.last, c->snd.una), utcp->mss);
1050 if(fin_wanted(c, c->snd.una + len)) {
1052 pkt->hdr.ctl |= FIN;
1055 buffer_copy(&c->sndbuf, pkt->data, 0, len);
1056 print_packet(c, "rtrx", pkt, sizeof(pkt->hdr) + len);
1057 utcp->send(utcp, pkt, sizeof(pkt->hdr) + len);
1065 static void retransmit(struct utcp_connection *c) {
1066 if(c->state == CLOSED || c->snd.last == c->snd.una) {
1067 debug(c, "retransmit() called but nothing to retransmit!\n");
1068 stop_retransmit_timer(c);
1072 struct utcp *utcp = c->utcp;
1074 if(utcp->retransmit && is_reliable(c)) {
1075 utcp->retransmit(c);
1081 } *pkt = c->utcp->pkt;
1083 pkt->hdr.src = c->src;
1084 pkt->hdr.dst = c->dst;
1085 pkt->hdr.wnd = c->rcvbuf.maxsize;
1090 // Send our SYN again
1091 pkt->hdr.seq = c->snd.iss;
1094 pkt->hdr.aux = 0x0101;
1098 pkt->data[3] = c->flags & 0x7;
1099 print_packet(c, "rtrx", pkt, sizeof(pkt->hdr) + 4);
1100 utcp->send(utcp, pkt, sizeof(pkt->hdr) + 4);
1104 // Send SYNACK again
1105 pkt->hdr.seq = c->snd.nxt;
1106 pkt->hdr.ack = c->rcv.nxt;
1107 pkt->hdr.ctl = SYN | ACK;
1108 print_packet(c, "rtrx", pkt, sizeof(pkt->hdr));
1109 utcp->send(utcp, pkt, sizeof(pkt->hdr));
1117 if(!is_reliable(c) && is_framed(c) && c->sndbuf.used) {
1118 flush_unreliable_framed(c);
1122 // Send unacked data again.
1123 pkt->hdr.seq = c->snd.una;
1124 pkt->hdr.ack = c->rcv.nxt;
1126 uint32_t len = min(seqdiff(c->snd.last, c->snd.una), utcp->mss);
1128 if(fin_wanted(c, c->snd.una + len)) {
1130 pkt->hdr.ctl |= FIN;
1133 // RFC 5681 slow start after timeout
1134 uint32_t flightsize = seqdiff(c->snd.nxt, c->snd.una);
1135 c->snd.ssthresh = max(flightsize / 2, utcp->mss * 2); // eq. 4
1136 c->snd.cwnd = utcp->mss;
1139 buffer_copy(&c->sndbuf, pkt->data, 0, len);
1140 print_packet(c, "rtrx", pkt, sizeof(pkt->hdr) + len);
1141 utcp->send(utcp, pkt, sizeof(pkt->hdr) + len);
1143 c->snd.nxt = c->snd.una + len;
1150 // We shouldn't need to retransmit anything in this state.
1154 stop_retransmit_timer(c);
1158 start_retransmit_timer(c);
1161 if(c->rto > MAX_RTO) {
1165 c->rtt_start.tv_sec = 0; // invalidate RTT timer
1166 c->dupack = 0; // cancel any ongoing fast recovery
1172 /* Update receive buffer and SACK entries after consuming data.
1176 * |.....0000..1111111111.....22222......3333|
1179 * 0..3 represent the SACK entries. The ^ indicates up to which point we want
1180 * to remove data from the receive buffer. The idea is to substract "len"
1181 * from the offset of all the SACK entries, and then remove/cut down entries
1182 * that are shifted to before the start of the receive buffer.
1184 * There are three cases:
1185 * - the SACK entry is after ^, in that case just change the offset.
1186 * - the SACK entry starts before and ends after ^, so we have to
1187 * change both its offset and size.
1188 * - the SACK entry is completely before ^, in that case delete it.
1190 static void sack_consume(struct utcp_connection *c, size_t len) {
1191 debug(c, "sack_consume %lu\n", (unsigned long)len);
1193 if(len > c->rcvbuf.used) {
1194 debug(c, "all SACK entries consumed\n");
1195 c->sacks[0].len = 0;
1199 buffer_discard(&c->rcvbuf, len);
1201 for(int i = 0; i < NSACKS && c->sacks[i].len;) {
1202 if(len < c->sacks[i].offset) {
1203 c->sacks[i].offset -= len;
1205 } else if(len < c->sacks[i].offset + c->sacks[i].len) {
1206 c->sacks[i].len -= len - c->sacks[i].offset;
1207 c->sacks[i].offset = 0;
1210 if(i < NSACKS - 1) {
1211 memmove(&c->sacks[i], &c->sacks[i + 1], (NSACKS - 1 - i) * sizeof(c->sacks)[i]);
1212 c->sacks[NSACKS - 1].len = 0;
1214 c->sacks[i].len = 0;
1220 for(int i = 0; i < NSACKS && c->sacks[i].len; i++) {
1221 debug(c, "SACK[%d] offset %u len %u\n", i, c->sacks[i].offset, c->sacks[i].len);
1225 static void handle_out_of_order(struct utcp_connection *c, uint32_t offset, const void *data, size_t len) {
1226 debug(c, "out of order packet, offset %u\n", offset);
1227 // Packet loss or reordering occured. Store the data in the buffer.
1228 ssize_t rxd = buffer_put_at(&c->rcvbuf, offset, data, len);
1231 debug(c, "packet outside receive buffer, dropping\n");
1235 if((size_t)rxd < len) {
1236 debug(c, "packet partially outside receive buffer\n");
1240 // Make note of where we put it.
1241 for(int i = 0; i < NSACKS; i++) {
1242 if(!c->sacks[i].len) { // nothing to merge, add new entry
1243 debug(c, "new SACK entry %d\n", i);
1244 c->sacks[i].offset = offset;
1245 c->sacks[i].len = rxd;
1247 } else if(offset < c->sacks[i].offset) {
1248 if(offset + rxd < c->sacks[i].offset) { // insert before
1249 if(!c->sacks[NSACKS - 1].len) { // only if room left
1250 debug(c, "insert SACK entry at %d\n", i);
1251 memmove(&c->sacks[i + 1], &c->sacks[i], (NSACKS - i - 1) * sizeof(c->sacks)[i]);
1252 c->sacks[i].offset = offset;
1253 c->sacks[i].len = rxd;
1255 debug(c, "SACK entries full, dropping packet\n");
1260 debug(c, "merge with start of SACK entry at %d\n", i);
1261 c->sacks[i].offset = offset;
1264 } else if(offset <= c->sacks[i].offset + c->sacks[i].len) {
1265 if(offset + rxd > c->sacks[i].offset + c->sacks[i].len) { // merge
1266 debug(c, "merge with end of SACK entry at %d\n", i);
1267 c->sacks[i].len = offset + rxd - c->sacks[i].offset;
1268 // TODO: handle potential merge with next entry
1275 for(int i = 0; i < NSACKS && c->sacks[i].len; i++) {
1276 debug(c, "SACK[%d] offset %u len %u\n", i, c->sacks[i].offset, c->sacks[i].len);
1280 static void handle_out_of_order_framed(struct utcp_connection *c, uint32_t offset, const void *data, size_t len) {
1281 uint32_t in_order_offset = (c->sacks[0].len && c->sacks[0].offset == 0) ? c->sacks[0].len : 0;
1283 // Put the data into the receive buffer
1284 handle_out_of_order(c, offset + in_order_offset, data, len);
1287 static void handle_in_order(struct utcp_connection *c, const void *data, size_t len) {
1289 ssize_t rxd = c->recv(c, data, len);
1291 if(rxd != (ssize_t)len) {
1292 // TODO: handle the application not accepting all data.
1297 // Check if we can process out-of-order data now.
1298 if(c->sacks[0].len && len >= c->sacks[0].offset) {
1299 debug(c, "incoming packet len %lu connected with SACK at %u\n", (unsigned long)len, c->sacks[0].offset);
1301 if(len < c->sacks[0].offset + c->sacks[0].len) {
1302 size_t offset = len;
1303 len = c->sacks[0].offset + c->sacks[0].len;
1304 size_t remainder = len - offset;
1306 ssize_t rxd = buffer_call(c, &c->rcvbuf, offset, remainder);
1308 if(rxd != (ssize_t)remainder) {
1309 // TODO: handle the application not accepting all data.
1315 if(c->rcvbuf.used) {
1316 sack_consume(c, len);
1322 static void handle_in_order_framed(struct utcp_connection *c, const void *data, size_t len) {
1323 // Treat it as out of order, since it is unlikely the start of this packet contains the start of a frame.
1324 uint32_t in_order_offset = (c->sacks[0].len && c->sacks[0].offset == 0) ? c->sacks[0].len : 0;
1325 handle_out_of_order(c, in_order_offset, data, len);
1327 // While we have full frames at the start, give them to the application
1328 while(c->sacks[0].len >= 2 && c->sacks[0].offset == 0) {
1330 buffer_copy(&c->rcvbuf, &framelen, 0, sizeof(&framelen));
1332 if(framelen > c->sacks[0].len - 2) {
1338 uint32_t realoffset = c->rcvbuf.offset + 2;
1340 if(c->rcvbuf.size - c->rcvbuf.offset <= 2) {
1341 realoffset -= c->rcvbuf.size;
1344 if(realoffset > c->rcvbuf.size - framelen) {
1345 // The buffer wraps, we need to copy
1346 uint8_t buf[framelen];
1347 buffer_copy(&c->rcvbuf, buf, 2, framelen);
1348 rxd = c->recv(c, buf, framelen);
1350 // The frame is contiguous in the receive buffer
1351 rxd = c->recv(c, c->rcvbuf.data + realoffset, framelen);
1354 if(rxd != (ssize_t)framelen) {
1355 // TODO: handle the application not accepting all data.
1360 sack_consume(c, framelen + 2);
1366 static void handle_unreliable(struct utcp_connection *c, const struct hdr *hdr, const void *data, size_t len) {
1367 // Fast path for unfragmented packets
1368 if(!hdr->wnd && !(hdr->ctl & MF)) {
1370 c->recv(c, data, len);
1373 c->rcv.nxt = hdr->seq + len;
1377 // Ensure reassembled packet are not larger than 64 kiB
1378 if(hdr->wnd > MAX_UNRELIABLE_SIZE || hdr->wnd + len > MAX_UNRELIABLE_SIZE) {
1382 // Don't accept out of order fragments
1383 if(hdr->wnd && hdr->seq != c->rcv.nxt) {
1387 // Reset the receive buffer for the first fragment
1389 buffer_clear(&c->rcvbuf);
1392 ssize_t rxd = buffer_put_at(&c->rcvbuf, hdr->wnd, data, len);
1394 if(rxd != (ssize_t)len) {
1398 // Send the packet if it's the final fragment
1399 if(!(hdr->ctl & MF)) {
1400 buffer_call(c, &c->rcvbuf, 0, hdr->wnd + len);
1403 c->rcv.nxt = hdr->seq + len;
1406 static void handle_unreliable_framed(struct utcp_connection *c, const struct hdr *hdr, const void *data, size_t len) {
1407 bool in_order = hdr->seq == c->rcv.nxt;
1408 c->rcv.nxt = hdr->seq + len;
1410 const uint8_t *ptr = data;
1413 // Does it start with a partial frame?
1415 // Only accept the data if it is in order
1416 if(in_order && c->rcvbuf.used) {
1417 // In order, append it to the receive buffer
1418 buffer_put(&c->rcvbuf, data, min(hdr->wnd, len));
1420 if(hdr->wnd <= len) {
1421 // We have a full frame
1422 c->recv(c, (uint8_t *)c->rcvbuf.data + 2, c->rcvbuf.used - 2);
1426 // Exit early if there is other data in this frame
1427 if(hdr->wnd > len) {
1429 buffer_clear(&c->rcvbuf);
1439 // We now start with new frames, so clear any data in the receive buffer
1440 buffer_clear(&c->rcvbuf);
1442 // Handle whole frames
1445 memcpy(&framelen, ptr, sizeof(framelen));
1447 if(left <= (size_t)framelen + 2) {
1451 c->recv(c, ptr + 2, framelen);
1452 ptr += framelen + 2;
1453 left -= framelen + 2;
1456 // Handle partial last frame
1458 buffer_put(&c->rcvbuf, ptr, left);
1462 static void handle_incoming_data(struct utcp_connection *c, const struct hdr *hdr, const void *data, size_t len) {
1463 if(!is_reliable(c)) {
1465 handle_unreliable_framed(c, hdr, data, len);
1467 handle_unreliable(c, hdr, data, len);
1473 uint32_t offset = seqdiff(hdr->seq, c->rcv.nxt);
1477 handle_out_of_order_framed(c, offset, data, len);
1479 handle_in_order_framed(c, data, len);
1483 handle_out_of_order(c, offset, data, len);
1485 handle_in_order(c, data, len);
1491 ssize_t utcp_recv(struct utcp *utcp, const void *data, size_t len) {
1492 const uint8_t *ptr = data;
1508 // Drop packets smaller than the header
1512 if(len < sizeof(hdr)) {
1513 print_packet(NULL, "recv", data, len);
1518 // Make a copy from the potentially unaligned data to a struct hdr
1520 memcpy(&hdr, ptr, sizeof(hdr));
1522 // Try to match the packet to an existing connection
1524 struct utcp_connection *c = find_connection(utcp, hdr.dst, hdr.src);
1525 print_packet(c, "recv", data, len);
1527 // Process the header
1532 // Drop packets with an unknown CTL flag
1534 if(hdr.ctl & ~(SYN | ACK | RST | FIN | MF)) {
1535 print_packet(NULL, "recv", data, len);
1540 // Check for auxiliary headers
1542 const uint8_t *init = NULL;
1544 uint16_t aux = hdr.aux;
1547 size_t auxlen = 4 * (aux >> 8) & 0xf;
1548 uint8_t auxtype = aux & 0xff;
1557 if(!(hdr.ctl & SYN) || auxlen != 4) {
1573 if(!(aux & 0x800)) {
1582 memcpy(&aux, ptr, 2);
1587 bool has_data = len || (hdr.ctl & (SYN | FIN));
1589 // Is it for a new connection?
1592 // Ignore RST packets
1598 // Is it a SYN packet and are we LISTENing?
1600 if(hdr.ctl & SYN && !(hdr.ctl & ACK) && utcp->accept) {
1601 // If we don't want to accept it, send a RST back
1602 if((utcp->pre_accept && !utcp->pre_accept(utcp, hdr.dst))) {
1607 // Try to allocate memory, otherwise send a RST back
1608 c = allocate_connection(utcp, hdr.dst, hdr.src);
1615 // Parse auxilliary information
1622 c->flags = init[3] & 0x7;
1624 c->flags = UTCP_TCP;
1628 // Return SYN+ACK, go to SYN_RECEIVED state
1629 c->snd.wnd = hdr.wnd;
1630 c->rcv.irs = hdr.seq;
1631 c->rcv.nxt = c->rcv.irs + 1;
1632 set_state(c, SYN_RECEIVED);
1639 pkt.hdr.src = c->src;
1640 pkt.hdr.dst = c->dst;
1641 pkt.hdr.ack = c->rcv.irs + 1;
1642 pkt.hdr.seq = c->snd.iss;
1643 pkt.hdr.wnd = c->rcvbuf.maxsize;
1644 pkt.hdr.ctl = SYN | ACK;
1647 pkt.hdr.aux = 0x0101;
1651 pkt.data[3] = c->flags & 0x7;
1652 print_packet(c, "send", &pkt, sizeof(hdr) + 4);
1653 utcp->send(utcp, &pkt, sizeof(hdr) + 4);
1656 print_packet(c, "send", &pkt, sizeof(hdr));
1657 utcp->send(utcp, &pkt, sizeof(hdr));
1660 start_retransmit_timer(c);
1662 // No, we don't want your packets, send a RST back
1670 debug(c, "state %s\n", strstate[c->state]);
1672 // In case this is for a CLOSED connection, ignore the packet.
1673 // TODO: make it so incoming packets can never match a CLOSED connection.
1675 if(c->state == CLOSED) {
1676 debug(c, "got packet for closed connection\n");
1680 // It is for an existing connection.
1682 // 1. Drop invalid packets.
1684 // 1a. Drop packets that should not happen in our current state.
1705 // 1b. Discard data that is not in our receive window.
1707 if(is_reliable(c)) {
1710 if(c->state == SYN_SENT) {
1712 } else if(len == 0) {
1713 acceptable = seqdiff(hdr.seq, c->rcv.nxt) >= 0;
1715 int32_t rcv_offset = seqdiff(hdr.seq, c->rcv.nxt);
1717 // cut already accepted front overlapping
1718 if(rcv_offset < 0) {
1719 acceptable = len > (size_t) - rcv_offset;
1724 hdr.seq -= rcv_offset;
1727 acceptable = seqdiff(hdr.seq, c->rcv.nxt) >= 0 && seqdiff(hdr.seq, c->rcv.nxt) + len <= c->rcvbuf.maxsize;
1732 debug(c, "packet not acceptable, %u <= %u + %lu < %u\n", c->rcv.nxt, hdr.seq, (unsigned long)len, c->rcv.nxt + c->rcvbuf.maxsize);
1734 // Ignore unacceptable RST packets.
1739 // Otherwise, continue processing.
1744 int32_t rcv_offset = seqdiff(hdr.seq, c->rcv.nxt);
1747 debug(c, "packet out of order, offset %u bytes\n", rcv_offset);
1753 c->snd.wnd = hdr.wnd; // TODO: move below
1755 // 1c. Drop packets with an invalid ACK.
1756 // ackno should not roll back, and it should also not be bigger than what we ever could have sent
1757 // (= snd.una + c->sndbuf.used).
1759 if(!is_reliable(c)) {
1760 if(hdr.ack != c->snd.last && c->state >= ESTABLISHED) {
1761 hdr.ack = c->snd.una;
1765 if(hdr.ctl & ACK && (seqdiff(hdr.ack, c->snd.last) > 0 || seqdiff(hdr.ack, c->snd.una) < 0)) {
1766 debug(c, "packet ack seqno out of range, %u <= %u < %u\n", c->snd.una, hdr.ack, c->snd.una + c->sndbuf.used);
1768 // Ignore unacceptable RST packets.
1776 // 2. Handle RST packets
1781 if(!(hdr.ctl & ACK)) {
1785 // The peer has refused our connection.
1786 set_state(c, CLOSED);
1787 errno = ECONNREFUSED;
1790 c->recv(c, NULL, 0);
1793 if(c->poll && !c->reapable) {
1804 // We haven't told the application about this connection yet. Silently delete.
1816 // The peer has aborted our connection.
1817 set_state(c, CLOSED);
1821 c->recv(c, NULL, 0);
1824 if(c->poll && !c->reapable) {
1837 // As far as the application is concerned, the connection has already been closed.
1838 // If it has called utcp_close() already, we can immediately free this connection.
1844 // Otherwise, immediately move to the CLOSED state.
1845 set_state(c, CLOSED);
1858 if(!(hdr.ctl & ACK)) {
1863 // 3. Advance snd.una
1865 advanced = seqdiff(hdr.ack, c->snd.una);
1869 if(c->rtt_start.tv_sec) {
1870 if(c->rtt_seq == hdr.ack) {
1871 struct timespec now;
1872 clock_gettime(UTCP_CLOCK, &now);
1873 int32_t diff = timespec_diff_usec(&now, &c->rtt_start);
1874 update_rtt(c, diff);
1875 c->rtt_start.tv_sec = 0;
1876 } else if(c->rtt_seq < hdr.ack) {
1877 debug(c, "cancelling RTT measurement: %u < %u\n", c->rtt_seq, hdr.ack);
1878 c->rtt_start.tv_sec = 0;
1882 int32_t data_acked = advanced;
1890 // TODO: handle FIN as well.
1895 assert(data_acked >= 0);
1898 int32_t bufused = seqdiff(c->snd.last, c->snd.una);
1899 assert(data_acked <= bufused);
1903 buffer_discard(&c->sndbuf, data_acked);
1905 if(is_reliable(c)) {
1910 // Also advance snd.nxt if possible
1911 if(seqdiff(c->snd.nxt, hdr.ack) < 0) {
1912 c->snd.nxt = hdr.ack;
1915 c->snd.una = hdr.ack;
1918 if(c->dupack >= 3) {
1919 debug(c, "fast recovery ended\n");
1920 c->snd.cwnd = c->snd.ssthresh;
1926 // Increase the congestion window according to RFC 5681
1927 if(c->snd.cwnd < c->snd.ssthresh) {
1928 c->snd.cwnd += min(advanced, utcp->mss); // eq. 2
1930 c->snd.cwnd += max(1, (utcp->mss * utcp->mss) / c->snd.cwnd); // eq. 3
1933 if(c->snd.cwnd > c->sndbuf.maxsize) {
1934 c->snd.cwnd = c->sndbuf.maxsize;
1939 // Check if we have sent a FIN that is now ACKed.
1942 if(c->snd.una == c->snd.last) {
1943 set_state(c, FIN_WAIT_2);
1949 if(c->snd.una == c->snd.last) {
1950 clock_gettime(UTCP_CLOCK, &c->conn_timeout);
1951 c->conn_timeout.tv_sec += utcp->timeout;
1952 set_state(c, TIME_WAIT);
1961 if(!len && is_reliable(c) && c->snd.una != c->snd.last) {
1963 debug(c, "duplicate ACK %d\n", c->dupack);
1965 if(c->dupack == 3) {
1966 // RFC 5681 fast recovery
1967 debug(c, "fast recovery started\n", c->dupack);
1968 uint32_t flightsize = seqdiff(c->snd.nxt, c->snd.una);
1969 c->snd.ssthresh = max(flightsize / 2, utcp->mss * 2); // eq. 4
1970 c->snd.cwnd = min(c->snd.ssthresh + 3 * utcp->mss, c->sndbuf.maxsize);
1972 if(c->snd.cwnd > c->sndbuf.maxsize) {
1973 c->snd.cwnd = c->sndbuf.maxsize;
1979 } else if(c->dupack > 3) {
1980 c->snd.cwnd += utcp->mss;
1982 if(c->snd.cwnd > c->sndbuf.maxsize) {
1983 c->snd.cwnd = c->sndbuf.maxsize;
1989 // We got an ACK which indicates the other side did get one of our packets.
1990 // Reset the retransmission timer to avoid going to slow start,
1991 // but don't touch the connection timeout.
1992 start_retransmit_timer(c);
1999 if(c->snd.una == c->snd.last) {
2000 stop_retransmit_timer(c);
2001 timespec_clear(&c->conn_timeout);
2002 } else if(is_reliable(c)) {
2003 start_retransmit_timer(c);
2004 clock_gettime(UTCP_CLOCK, &c->conn_timeout);
2005 c->conn_timeout.tv_sec += utcp->timeout;
2010 // 5. Process SYN stuff
2016 // This is a SYNACK. It should always have ACKed the SYN.
2021 c->rcv.irs = hdr.seq;
2022 c->rcv.nxt = hdr.seq + 1;
2026 set_state(c, FIN_WAIT_1);
2028 set_state(c, ESTABLISHED);
2034 // This is a retransmit of a SYN, send back the SYNACK.
2044 // This could be a retransmission. Ignore the SYN flag, but send an ACK back.
2055 // 6. Process new data
2057 if(c->state == SYN_RECEIVED) {
2058 // This is the ACK after the SYNACK. It should always have ACKed the SYNACK.
2063 // Are we still LISTENing?
2065 utcp->accept(c, c->src);
2068 if(c->state != ESTABLISHED) {
2069 set_state(c, CLOSED);
2079 // This should never happen.
2094 // Ehm no, We should never receive more data after a FIN.
2104 handle_incoming_data(c, &hdr, ptr, len);
2107 // 7. Process FIN stuff
2109 if((hdr.ctl & FIN) && (!is_reliable(c) || hdr.seq + len == c->rcv.nxt)) {
2113 // This should never happen.
2120 set_state(c, CLOSE_WAIT);
2124 set_state(c, CLOSING);
2128 clock_gettime(UTCP_CLOCK, &c->conn_timeout);
2129 c->conn_timeout.tv_sec += utcp->timeout;
2130 set_state(c, TIME_WAIT);
2137 // Ehm, no. We should never receive a second FIN.
2147 // FIN counts as one sequence number
2151 // Inform the application that the peer closed its end of the connection.
2154 c->recv(c, NULL, 0);
2158 // Now we send something back if:
2159 // - we received data, so we have to send back an ACK
2160 // -> sendatleastone = true
2161 // - or we got an ack, so we should maybe send a bit more data
2162 // -> sendatleastone = false
2164 if(is_reliable(c) || hdr.ctl & SYN || hdr.ctl & FIN) {
2179 hdr.ack = hdr.seq + len;
2181 hdr.ctl = RST | ACK;
2184 print_packet(c, "send", &hdr, sizeof(hdr));
2185 utcp->send(utcp, &hdr, sizeof(hdr));
2190 int utcp_shutdown(struct utcp_connection *c, int dir) {
2191 debug(c, "shutdown %d at %u\n", dir, c ? c->snd.last : 0);
2199 debug(c, "shutdown() called on closed connection\n");
2204 if(!(dir == UTCP_SHUT_RD || dir == UTCP_SHUT_WR || dir == UTCP_SHUT_RDWR)) {
2209 // TCP does not have a provision for stopping incoming packets.
2210 // The best we can do is to just ignore them.
2211 if(dir == UTCP_SHUT_RD || dir == UTCP_SHUT_RDWR) {
2215 // The rest of the code deals with shutting down writes.
2216 if(dir == UTCP_SHUT_RD) {
2220 // Only process shutting down writes once.
2238 if(!is_reliable(c) && is_framed(c)) {
2239 flush_unreliable_framed(c);
2242 set_state(c, FIN_WAIT_1);
2250 set_state(c, CLOSING);
2261 ack(c, !is_reliable(c));
2263 if(!timespec_isset(&c->rtrx_timeout)) {
2264 start_retransmit_timer(c);
2270 static bool reset_connection(struct utcp_connection *c) {
2277 debug(c, "abort() called on closed connection\n");
2294 set_state(c, CLOSED);
2302 set_state(c, CLOSED);
2312 hdr.seq = c->snd.nxt;
2317 print_packet(c, "send", &hdr, sizeof(hdr));
2318 c->utcp->send(c->utcp, &hdr, sizeof(hdr));
2322 // Closes all the opened connections
2323 void utcp_abort_all_connections(struct utcp *utcp) {
2329 for(int i = 0; i < utcp->nconnections; i++) {
2330 struct utcp_connection *c = utcp->connections[i];
2332 if(c->reapable || c->state == CLOSED) {
2336 utcp_recv_t old_recv = c->recv;
2337 utcp_poll_t old_poll = c->poll;
2339 reset_connection(c);
2343 old_recv(c, NULL, 0);
2346 if(old_poll && !c->reapable) {
2355 int utcp_close(struct utcp_connection *c) {
2356 debug(c, "closing\n");
2358 if(c->rcvbuf.used) {
2359 debug(c, "receive buffer not empty, resetting\n");
2360 return reset_connection(c) ? 0 : -1;
2363 if(utcp_shutdown(c, SHUT_RDWR) && errno != ENOTCONN) {
2373 int utcp_abort(struct utcp_connection *c) {
2374 if(!reset_connection(c)) {
2383 * One call to this function will loop through all connections,
2384 * checking if something needs to be resent or not.
2385 * The return value is the time to the next timeout in milliseconds,
2386 * or maybe a negative value if the timeout is infinite.
2388 struct timespec utcp_timeout(struct utcp *utcp) {
2389 struct timespec now;
2390 clock_gettime(UTCP_CLOCK, &now);
2391 struct timespec next = {now.tv_sec + 3600, now.tv_nsec};
2393 for(int i = 0; i < utcp->nconnections; i++) {
2394 struct utcp_connection *c = utcp->connections[i];
2400 // delete connections that have been utcp_close()d.
2401 if(c->state == CLOSED) {
2403 debug(c, "reaping\n");
2411 if(timespec_isset(&c->conn_timeout) && timespec_lt(&c->conn_timeout, &now)) {
2416 c->recv(c, NULL, 0);
2419 if(c->poll && !c->reapable) {
2426 if(timespec_isset(&c->rtrx_timeout) && timespec_lt(&c->rtrx_timeout, &now)) {
2427 debug(c, "retransmitting after timeout\n");
2432 if((c->state == ESTABLISHED || c->state == CLOSE_WAIT) && c->do_poll) {
2434 uint32_t len = buffer_free(&c->sndbuf);
2439 } else if(c->state == CLOSED) {
2444 if(timespec_isset(&c->conn_timeout) && timespec_lt(&c->conn_timeout, &next)) {
2445 next = c->conn_timeout;
2448 if(timespec_isset(&c->rtrx_timeout) && timespec_lt(&c->rtrx_timeout, &next)) {
2449 next = c->rtrx_timeout;
2453 struct timespec diff;
2455 timespec_sub(&next, &now, &diff);
2460 bool utcp_is_active(struct utcp *utcp) {
2465 for(int i = 0; i < utcp->nconnections; i++)
2466 if(utcp->connections[i]->state != CLOSED && utcp->connections[i]->state != TIME_WAIT) {
2473 struct utcp *utcp_init(utcp_accept_t accept, utcp_pre_accept_t pre_accept, utcp_send_t send, void *priv) {
2479 struct utcp *utcp = calloc(1, sizeof(*utcp));
2485 utcp_set_mtu(utcp, DEFAULT_MTU);
2492 if(!CLOCK_GRANULARITY) {
2493 struct timespec res;
2494 clock_getres(UTCP_CLOCK, &res);
2495 CLOCK_GRANULARITY = res.tv_sec * USEC_PER_SEC + res.tv_nsec / 1000;
2498 utcp->accept = accept;
2499 utcp->pre_accept = pre_accept;
2502 utcp->timeout = DEFAULT_USER_TIMEOUT; // sec
2507 void utcp_exit(struct utcp *utcp) {
2512 for(int i = 0; i < utcp->nconnections; i++) {
2513 struct utcp_connection *c = utcp->connections[i];
2517 c->recv(c, NULL, 0);
2520 if(c->poll && !c->reapable) {
2525 buffer_exit(&c->rcvbuf);
2526 buffer_exit(&c->sndbuf);
2530 free(utcp->connections);
2535 uint16_t utcp_get_mtu(struct utcp *utcp) {
2536 return utcp ? utcp->mtu : 0;
2539 uint16_t utcp_get_mss(struct utcp *utcp) {
2540 return utcp ? utcp->mss : 0;
2543 void utcp_set_mtu(struct utcp *utcp, uint16_t mtu) {
2548 if(mtu <= sizeof(struct hdr)) {
2552 if(mtu > utcp->mtu) {
2553 char *new = realloc(utcp->pkt, mtu + sizeof(struct hdr));
2563 utcp->mss = mtu - sizeof(struct hdr);
2566 void utcp_reset_timers(struct utcp *utcp) {
2571 struct timespec now, then;
2573 clock_gettime(UTCP_CLOCK, &now);
2577 then.tv_sec += utcp->timeout;
2579 for(int i = 0; i < utcp->nconnections; i++) {
2580 struct utcp_connection *c = utcp->connections[i];
2586 if(timespec_isset(&c->rtrx_timeout)) {
2587 c->rtrx_timeout = now;
2590 if(timespec_isset(&c->conn_timeout)) {
2591 c->conn_timeout = then;
2594 c->rtt_start.tv_sec = 0;
2596 if(c->rto > START_RTO) {
2602 int utcp_get_user_timeout(struct utcp *u) {
2603 return u ? u->timeout : 0;
2606 void utcp_set_user_timeout(struct utcp *u, int timeout) {
2608 u->timeout = timeout;
2612 size_t utcp_get_sndbuf(struct utcp_connection *c) {
2613 return c ? c->sndbuf.maxsize : 0;
2616 size_t utcp_get_sndbuf_free(struct utcp_connection *c) {
2626 return buffer_free(&c->sndbuf);
2633 void utcp_set_sndbuf(struct utcp_connection *c, size_t size) {
2638 c->sndbuf.maxsize = size;
2640 if(c->sndbuf.maxsize != size) {
2641 c->sndbuf.maxsize = -1;
2644 c->do_poll = is_reliable(c) && buffer_free(&c->sndbuf);
2647 size_t utcp_get_rcvbuf(struct utcp_connection *c) {
2648 return c ? c->rcvbuf.maxsize : 0;
2651 size_t utcp_get_rcvbuf_free(struct utcp_connection *c) {
2652 if(c && (c->state == ESTABLISHED || c->state == CLOSE_WAIT)) {
2653 return buffer_free(&c->rcvbuf);
2659 void utcp_set_rcvbuf(struct utcp_connection *c, size_t size) {
2664 c->rcvbuf.maxsize = size;
2666 if(c->rcvbuf.maxsize != size) {
2667 c->rcvbuf.maxsize = -1;
2671 size_t utcp_get_sendq(struct utcp_connection *c) {
2672 return c->sndbuf.used;
2675 size_t utcp_get_recvq(struct utcp_connection *c) {
2676 return c->rcvbuf.used;
2679 bool utcp_get_nodelay(struct utcp_connection *c) {
2680 return c ? c->nodelay : false;
2683 void utcp_set_nodelay(struct utcp_connection *c, bool nodelay) {
2685 c->nodelay = nodelay;
2689 bool utcp_get_keepalive(struct utcp_connection *c) {
2690 return c ? c->keepalive : false;
2693 void utcp_set_keepalive(struct utcp_connection *c, bool keepalive) {
2695 c->keepalive = keepalive;
2699 size_t utcp_get_outq(struct utcp_connection *c) {
2700 return c ? seqdiff(c->snd.nxt, c->snd.una) : 0;
2703 void utcp_set_recv_cb(struct utcp_connection *c, utcp_recv_t recv) {
2709 void utcp_set_poll_cb(struct utcp_connection *c, utcp_poll_t poll) {
2712 c->do_poll = is_reliable(c) && buffer_free(&c->sndbuf);
2716 void utcp_set_accept_cb(struct utcp *utcp, utcp_accept_t accept, utcp_pre_accept_t pre_accept) {
2718 utcp->accept = accept;
2719 utcp->pre_accept = pre_accept;
2723 void utcp_expect_data(struct utcp_connection *c, bool expect) {
2724 if(!c || c->reapable) {
2728 if(!(c->state == ESTABLISHED || c->state == FIN_WAIT_1 || c->state == FIN_WAIT_2)) {
2733 // If we expect data, start the connection timer.
2734 if(!timespec_isset(&c->conn_timeout)) {
2735 clock_gettime(UTCP_CLOCK, &c->conn_timeout);
2736 c->conn_timeout.tv_sec += c->utcp->timeout;
2739 // If we want to cancel expecting data, only clear the timer when there is no unACKed data.
2740 if(c->snd.una == c->snd.last) {
2741 timespec_clear(&c->conn_timeout);
2746 void utcp_offline(struct utcp *utcp, bool offline) {
2747 struct timespec now;
2748 clock_gettime(UTCP_CLOCK, &now);
2750 for(int i = 0; i < utcp->nconnections; i++) {
2751 struct utcp_connection *c = utcp->connections[i];
2757 utcp_expect_data(c, offline);
2760 if(timespec_isset(&c->rtrx_timeout)) {
2761 c->rtrx_timeout = now;
2764 utcp->connections[i]->rtt_start.tv_sec = 0;
2766 if(c->rto > START_RTO) {
2773 void utcp_set_retransmit_cb(struct utcp *utcp, utcp_retransmit_t cb) {
2774 utcp->retransmit = cb;
2777 void utcp_set_clock_granularity(long granularity) {
2778 CLOCK_GRANULARITY = granularity;
2781 int utcp_get_flush_timeout(struct utcp *utcp) {
2782 return utcp->flush_timeout;
2785 void utcp_set_flush_timeout(struct utcp *utcp, int milliseconds) {
2786 utcp->flush_timeout = milliseconds;