2 utcp.c -- Userspace TCP
3 Copyright (C) 2014-2017 Guus Sliepen <guus@tinc-vpn.org>
5 This program is free software; you can redistribute it and/or modify
6 it under the terms of the GNU General Public License as published by
7 the Free Software Foundation; either version 2 of the License, or
8 (at your option) any later version.
10 This program is distributed in the hope that it will be useful,
11 but WITHOUT ANY WARRANTY; without even the implied warranty of
12 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13 GNU General Public License for more details.
15 You should have received a copy of the GNU General Public License along
16 with this program; if not, write to the Free Software Foundation, Inc.,
17 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
32 #include "utcp_priv.h"
47 #if defined(CLOCK_MONOTONIC_RAW) && defined(__x86_64__)
48 #define UTCP_CLOCK CLOCK_MONOTONIC_RAW
50 #define UTCP_CLOCK CLOCK_MONOTONIC
54 static void timespec_sub(const struct timespec *a, const struct timespec *b, struct timespec *r) {
55 r->tv_sec = a->tv_sec - b->tv_sec;
56 r->tv_nsec = a->tv_nsec - b->tv_nsec;
59 r->tv_sec--, r->tv_nsec += NSEC_PER_SEC;
63 static int32_t timespec_diff_usec(const struct timespec *a, const struct timespec *b) {
64 return (a->tv_sec - b->tv_sec) * 1000000 + (a->tv_nsec - b->tv_nsec) / 1000;
67 static bool timespec_lt(const struct timespec *a, const struct timespec *b) {
68 if(a->tv_sec == b->tv_sec) {
69 return a->tv_nsec < b->tv_nsec;
71 return a->tv_sec < b->tv_sec;
75 static void timespec_clear(struct timespec *a) {
80 static bool timespec_isset(const struct timespec *a) {
84 static long CLOCK_GRANULARITY; // usec
86 static inline size_t min(size_t a, size_t b) {
90 static inline size_t max(size_t a, size_t b) {
97 #ifndef UTCP_DEBUG_DATALEN
98 #define UTCP_DEBUG_DATALEN 20
101 static void debug(struct utcp_connection *c, const char *format, ...) {
106 clock_gettime(CLOCK_REALTIME, &tv);
107 len = snprintf(buf, sizeof(buf), "%ld.%06lu %u:%u ", (long)tv.tv_sec, tv.tv_nsec / 1000, c ? c->src : 0, c ? c->dst : 0);
109 va_start(ap, format);
110 len += vsnprintf(buf + len, sizeof(buf) - len, format, ap);
113 if(len > 0 && (size_t)len < sizeof(buf)) {
114 fwrite(buf, len, 1, stderr);
118 static void print_packet(struct utcp_connection *c, const char *dir, const void *pkt, size_t len) {
121 if(len < sizeof(hdr)) {
122 debug(c, "%s: short packet (%lu bytes)\n", dir, (unsigned long)len);
126 memcpy(&hdr, pkt, sizeof(hdr));
130 if(len > sizeof(hdr)) {
131 datalen = min(len - sizeof(hdr), UTCP_DEBUG_DATALEN);
137 const uint8_t *data = (uint8_t *)pkt + sizeof(hdr);
138 char str[datalen * 2 + 1];
141 for(uint32_t i = 0; i < datalen; i++) {
142 *p++ = "0123456789ABCDEF"[data[i] >> 4];
143 *p++ = "0123456789ABCDEF"[data[i] & 15];
148 debug(c, "%s: len %lu src %u dst %u seq %u ack %u wnd %u aux %x ctl %s%s%s%s%s data %s\n",
149 dir, (unsigned long)len, hdr.src, hdr.dst, hdr.seq, hdr.ack, hdr.wnd, hdr.aux,
150 hdr.ctl & SYN ? "SYN" : "",
151 hdr.ctl & RST ? "RST" : "",
152 hdr.ctl & FIN ? "FIN" : "",
153 hdr.ctl & ACK ? "ACK" : "",
154 hdr.ctl & MF ? "MF" : "",
159 static void debug_cwnd(struct utcp_connection *c) {
160 debug(c, "snd.cwnd %u snd.ssthresh %u\n", c->snd.cwnd, ~c->snd.ssthresh ? c->snd.ssthresh : 0);
163 #define debug(...) do {} while(0)
164 #define print_packet(...) do {} while(0)
165 #define debug_cwnd(...) do {} while(0)
168 static void set_state(struct utcp_connection *c, enum state state) {
171 if(state == ESTABLISHED) {
172 timespec_clear(&c->conn_timeout);
175 debug(c, "state %s\n", strstate[state]);
178 static bool fin_wanted(struct utcp_connection *c, uint32_t seq) {
179 if(seq != c->snd.last) {
194 static bool is_reliable(struct utcp_connection *c) {
195 return c->flags & UTCP_RELIABLE;
198 static bool is_framed(struct utcp_connection *c) {
199 return c->flags & UTCP_FRAMED;
202 static int32_t seqdiff(uint32_t a, uint32_t b) {
207 static bool buffer_wraps(struct buffer *buf) {
208 return buf->size - buf->offset < buf->used;
211 static bool buffer_resize(struct buffer *buf, uint32_t newsize) {
212 char *newdata = realloc(buf->data, newsize);
220 if(buffer_wraps(buf)) {
221 // Shift the right part of the buffer until it hits the end of the new buffer.
225 // [345.........|........012]
226 uint32_t tailsize = buf->size - buf->offset;
227 uint32_t newoffset = newsize - tailsize;
228 memmove(buf->data + newoffset, buf->data + buf->offset, tailsize);
229 buf->offset = newoffset;
236 // Store data into the buffer
237 static ssize_t buffer_put_at(struct buffer *buf, size_t offset, const void *data, size_t len) {
238 debug(NULL, "buffer_put_at %lu %lu %lu\n", (unsigned long)buf->used, (unsigned long)offset, (unsigned long)len);
240 // Ensure we don't store more than maxsize bytes in total
241 size_t required = offset + len;
243 if(required > buf->maxsize) {
244 if(offset >= buf->maxsize) {
248 len = buf->maxsize - offset;
249 required = buf->maxsize;
252 // Check if we need to resize the buffer
253 if(required > buf->size) {
254 size_t newsize = buf->size;
262 } while(newsize < required);
264 if(newsize > buf->maxsize) {
265 newsize = buf->maxsize;
268 if(!buffer_resize(buf, newsize)) {
273 uint32_t realoffset = buf->offset + offset;
275 if(buf->size - buf->offset <= offset) {
276 // The offset wrapped
277 realoffset -= buf->size;
280 if(buf->size - realoffset < len) {
281 // The new chunk of data must be wrapped
282 memcpy(buf->data + realoffset, data, buf->size - realoffset);
283 memcpy(buf->data, (char *)data + buf->size - realoffset, len - (buf->size - realoffset));
285 memcpy(buf->data + realoffset, data, len);
288 if(required > buf->used) {
289 buf->used = required;
295 static ssize_t buffer_put(struct buffer *buf, const void *data, size_t len) {
296 return buffer_put_at(buf, buf->used, data, len);
299 // Copy data from the buffer without removing it.
300 static ssize_t buffer_copy(struct buffer *buf, void *data, size_t offset, size_t len) {
301 // Ensure we don't copy more than is actually stored in the buffer
302 if(offset >= buf->used) {
306 if(buf->used - offset < len) {
307 len = buf->used - offset;
310 uint32_t realoffset = buf->offset + offset;
312 if(buf->size - buf->offset <= offset) {
313 // The offset wrapped
314 realoffset -= buf->size;
317 if(buf->size - realoffset < len) {
318 // The data is wrapped
319 memcpy(data, buf->data + realoffset, buf->size - realoffset);
320 memcpy((char *)data + buf->size - realoffset, buf->data, len - (buf->size - realoffset));
322 memcpy(data, buf->data + realoffset, len);
328 // Copy data from the buffer without removing it.
329 static ssize_t buffer_call(struct utcp_connection *c, struct buffer *buf, size_t offset, size_t len) {
334 // Ensure we don't copy more than is actually stored in the buffer
335 if(offset >= buf->used) {
339 if(buf->used - offset < len) {
340 len = buf->used - offset;
343 uint32_t realoffset = buf->offset + offset;
345 if(buf->size - buf->offset <= offset) {
346 // The offset wrapped
347 realoffset -= buf->size;
350 if(buf->size - realoffset < len) {
351 // The data is wrapped
352 ssize_t rx1 = c->recv(c, buf->data + realoffset, buf->size - realoffset);
354 if(rx1 < buf->size - realoffset) {
358 // The channel might have been closed by the previous callback
363 ssize_t rx2 = c->recv(c, buf->data, len - (buf->size - realoffset));
371 return c->recv(c, buf->data + realoffset, len);
375 // Discard data from the buffer.
376 static ssize_t buffer_discard(struct buffer *buf, size_t len) {
377 if(buf->used < len) {
381 if(buf->size - buf->offset <= len) {
382 buf->offset -= buf->size;
385 if(buf->used == len) {
396 static void buffer_clear(struct buffer *buf) {
401 static bool buffer_set_size(struct buffer *buf, uint32_t minsize, uint32_t maxsize) {
402 if(maxsize < minsize) {
406 buf->maxsize = maxsize;
408 return buf->size >= minsize || buffer_resize(buf, minsize);
411 static void buffer_exit(struct buffer *buf) {
413 memset(buf, 0, sizeof(*buf));
416 static uint32_t buffer_free(const struct buffer *buf) {
417 return buf->maxsize > buf->used ? buf->maxsize - buf->used : 0;
420 // Connections are stored in a sorted list.
421 // This gives O(log(N)) lookup time, O(N log(N)) insertion time and O(N) deletion time.
423 static int compare(const void *va, const void *vb) {
426 const struct utcp_connection *a = *(struct utcp_connection **)va;
427 const struct utcp_connection *b = *(struct utcp_connection **)vb;
430 assert(a->src && b->src);
432 int c = (int)a->src - (int)b->src;
438 c = (int)a->dst - (int)b->dst;
442 static struct utcp_connection *find_connection(const struct utcp *utcp, uint16_t src, uint16_t dst) {
443 if(!utcp->nconnections) {
447 struct utcp_connection key = {
451 struct utcp_connection **match = bsearch(&keyp, utcp->connections, utcp->nconnections, sizeof(*utcp->connections), compare);
452 return match ? *match : NULL;
455 static void free_connection(struct utcp_connection *c) {
456 struct utcp *utcp = c->utcp;
457 struct utcp_connection **cp = bsearch(&c, utcp->connections, utcp->nconnections, sizeof(*utcp->connections), compare);
461 int i = cp - utcp->connections;
462 memmove(cp, cp + 1, (utcp->nconnections - i - 1) * sizeof(*cp));
463 utcp->nconnections--;
465 buffer_exit(&c->rcvbuf);
466 buffer_exit(&c->sndbuf);
470 static struct utcp_connection *allocate_connection(struct utcp *utcp, uint16_t src, uint16_t dst) {
471 // Check whether this combination of src and dst is free
474 if(find_connection(utcp, src, dst)) {
478 } else { // If src == 0, generate a random port number with the high bit set
479 if(utcp->nconnections >= 32767) {
484 src = rand() | 0x8000;
486 while(find_connection(utcp, src, dst)) {
491 // Allocate memory for the new connection
493 if(utcp->nconnections >= utcp->nallocated) {
494 if(!utcp->nallocated) {
495 utcp->nallocated = 4;
497 utcp->nallocated *= 2;
500 struct utcp_connection **new_array = realloc(utcp->connections, utcp->nallocated * sizeof(*utcp->connections));
506 utcp->connections = new_array;
509 struct utcp_connection *c = calloc(1, sizeof(*c));
515 if(!buffer_set_size(&c->sndbuf, DEFAULT_SNDBUFSIZE, DEFAULT_MAXSNDBUFSIZE)) {
520 if(!buffer_set_size(&c->rcvbuf, DEFAULT_RCVBUFSIZE, DEFAULT_MAXRCVBUFSIZE)) {
521 buffer_exit(&c->sndbuf);
526 // Fill in the details
535 c->snd.una = c->snd.iss;
536 c->snd.nxt = c->snd.iss + 1;
537 c->snd.last = c->snd.nxt;
538 c->snd.cwnd = (utcp->mss > 2190 ? 2 : utcp->mss > 1095 ? 3 : 4) * utcp->mss;
539 c->snd.ssthresh = ~0;
546 // Add it to the sorted list of connections
548 utcp->connections[utcp->nconnections++] = c;
549 qsort(utcp->connections, utcp->nconnections, sizeof(*utcp->connections), compare);
554 static inline uint32_t absdiff(uint32_t a, uint32_t b) {
562 // Update RTT variables. See RFC 6298.
563 static void update_rtt(struct utcp_connection *c, uint32_t rtt) {
565 debug(c, "invalid rtt\n");
573 c->rttvar = (c->rttvar * 3 + absdiff(c->srtt, rtt)) / 4;
574 c->srtt = (c->srtt * 7 + rtt) / 8;
577 c->rto = c->srtt + max(4 * c->rttvar, CLOCK_GRANULARITY);
579 if(c->rto > MAX_RTO) {
583 debug(c, "rtt %u srtt %u rttvar %u rto %u\n", rtt, c->srtt, c->rttvar, c->rto);
586 static void start_retransmit_timer(struct utcp_connection *c) {
587 clock_gettime(UTCP_CLOCK, &c->rtrx_timeout);
589 uint32_t rto = c->rto;
591 while(rto > USEC_PER_SEC) {
592 c->rtrx_timeout.tv_sec++;
596 c->rtrx_timeout.tv_nsec += rto * 1000;
598 if(c->rtrx_timeout.tv_nsec >= NSEC_PER_SEC) {
599 c->rtrx_timeout.tv_nsec -= NSEC_PER_SEC;
600 c->rtrx_timeout.tv_sec++;
603 debug(c, "rtrx_timeout %ld.%06lu\n", c->rtrx_timeout.tv_sec, c->rtrx_timeout.tv_nsec);
606 static void start_flush_timer(struct utcp_connection *c) {
607 clock_gettime(UTCP_CLOCK, &c->rtrx_timeout);
609 c->rtrx_timeout.tv_nsec += c->utcp->flush_timeout * 1000000;
611 if(c->rtrx_timeout.tv_nsec >= NSEC_PER_SEC) {
612 c->rtrx_timeout.tv_nsec -= NSEC_PER_SEC;
613 c->rtrx_timeout.tv_sec++;
616 debug(c, "rtrx_timeout %ld.%06lu (flush)\n", c->rtrx_timeout.tv_sec, c->rtrx_timeout.tv_nsec);
619 static void stop_retransmit_timer(struct utcp_connection *c) {
620 timespec_clear(&c->rtrx_timeout);
621 debug(c, "rtrx_timeout cleared\n");
624 struct utcp_connection *utcp_connect_ex(struct utcp *utcp, uint16_t dst, utcp_recv_t recv, void *priv, uint32_t flags) {
625 struct utcp_connection *c = allocate_connection(utcp, 0, dst);
631 assert((flags & ~0x1f) == 0);
642 pkt.hdr.src = c->src;
643 pkt.hdr.dst = c->dst;
644 pkt.hdr.seq = c->snd.iss;
646 pkt.hdr.wnd = c->rcvbuf.maxsize;
648 pkt.hdr.aux = 0x0101;
652 pkt.init[3] = flags & 0x7;
654 set_state(c, SYN_SENT);
656 print_packet(c, "send", &pkt, sizeof(pkt));
657 utcp->send(utcp, &pkt, sizeof(pkt));
659 clock_gettime(UTCP_CLOCK, &c->conn_timeout);
660 c->conn_timeout.tv_sec += utcp->timeout;
662 start_retransmit_timer(c);
667 struct utcp_connection *utcp_connect(struct utcp *utcp, uint16_t dst, utcp_recv_t recv, void *priv) {
668 return utcp_connect_ex(utcp, dst, recv, priv, UTCP_TCP);
671 void utcp_accept(struct utcp_connection *c, utcp_recv_t recv, void *priv) {
672 if(c->reapable || c->state != SYN_RECEIVED) {
673 debug(c, "accept() called on invalid connection in state %s\n", c, strstate[c->state]);
677 debug(c, "accepted %p %p\n", c, recv, priv);
681 set_state(c, ESTABLISHED);
684 static void ack(struct utcp_connection *c, bool sendatleastone) {
685 int32_t left = seqdiff(c->snd.last, c->snd.nxt);
686 int32_t cwndleft = is_reliable(c) ? min(c->snd.cwnd, c->snd.wnd) - seqdiff(c->snd.nxt, c->snd.una) : MAX_UNRELIABLE_SIZE;
692 } else if(cwndleft < left) {
695 if(!sendatleastone || cwndleft > c->utcp->mss) {
696 left -= left % c->utcp->mss;
700 debug(c, "cwndleft %d left %d\n", cwndleft, left);
702 if(!left && !sendatleastone) {
709 } *pkt = c->utcp->pkt;
711 pkt->hdr.src = c->src;
712 pkt->hdr.dst = c->dst;
713 pkt->hdr.ack = c->rcv.nxt;
714 pkt->hdr.wnd = is_reliable(c) ? c->rcvbuf.maxsize : 0;
719 uint32_t seglen = left > c->utcp->mss ? c->utcp->mss : left;
720 pkt->hdr.seq = c->snd.nxt;
722 buffer_copy(&c->sndbuf, pkt->data, seqdiff(c->snd.nxt, c->snd.una), seglen);
724 c->snd.nxt += seglen;
727 if(!is_reliable(c)) {
735 if(seglen && fin_wanted(c, c->snd.nxt)) {
740 if(!c->rtt_start.tv_sec && is_reliable(c)) {
741 // Start RTT measurement
742 clock_gettime(UTCP_CLOCK, &c->rtt_start);
743 c->rtt_seq = pkt->hdr.seq + seglen;
744 debug(c, "starting RTT measurement, expecting ack %u\n", c->rtt_seq);
747 print_packet(c, "send", pkt, sizeof(pkt->hdr) + seglen);
748 c->utcp->send(c->utcp, pkt, sizeof(pkt->hdr) + seglen);
750 if(left && !is_reliable(c)) {
751 pkt->hdr.wnd += seglen;
756 static ssize_t utcp_send_reliable(struct utcp_connection *c, const void *data, size_t len) {
757 size_t rlen = len + (is_framed(c) ? 2 : 0);
763 // Check if we need to be able to buffer all data
765 if(c->flags & (UTCP_NO_PARTIAL | UTCP_FRAMED)) {
766 if(rlen > c->sndbuf.maxsize) {
771 if((c->flags & UTCP_FRAMED) && len > MAX_UNRELIABLE_SIZE) {
776 if(rlen > buffer_free(&c->sndbuf)) {
782 // Add data to the send buffer.
785 uint16_t len16 = len;
786 buffer_put(&c->sndbuf, &len16, sizeof(len16));
787 assert(buffer_put(&c->sndbuf, data, len) == (ssize_t)len);
789 len = buffer_put(&c->sndbuf, data, len);
799 // Don't send anything yet if the connection has not fully established yet
801 if(c->state == SYN_SENT || c->state == SYN_RECEIVED) {
807 if(!timespec_isset(&c->rtrx_timeout)) {
808 start_retransmit_timer(c);
811 if(!timespec_isset(&c->conn_timeout)) {
812 clock_gettime(UTCP_CLOCK, &c->conn_timeout);
813 c->conn_timeout.tv_sec += c->utcp->timeout;
820 /* In the send buffer we can have multiple frames, each prefixed with their
821 length. However, the first frame might already have been partially sent. The
822 variable c->frame_offset tracks how much of a partial frame is left at the
823 start. If it is 0, it means there is no partial frame, and the first two
824 bytes in the send buffer are the length of the first frame.
826 After sending an MSS sized packet, we need to calculate the new frame_offset
827 value, since it is likely that the next packet will also have a partial frame
828 at the start. We do this by scanning the previously sent packet for frame
829 headers, to find out how many bytes of the last frame are left to send.
831 static void ack_unreliable_framed(struct utcp_connection *c) {
832 int32_t left = seqdiff(c->snd.last, c->snd.nxt);
838 } *pkt = c->utcp->pkt;
840 pkt->hdr.src = c->src;
841 pkt->hdr.dst = c->dst;
842 pkt->hdr.ack = c->rcv.nxt;
843 pkt->hdr.ctl = ACK | MF;
846 bool sent_packet = false;
848 while(left >= c->utcp->mss) {
849 pkt->hdr.wnd = c->frame_offset;
850 uint32_t seglen = c->utcp->mss;
852 pkt->hdr.seq = c->snd.nxt;
854 buffer_copy(&c->sndbuf, pkt->data, seqdiff(c->snd.nxt, c->snd.una), seglen);
856 c->snd.nxt += seglen;
857 c->snd.una = c->snd.nxt;
860 print_packet(c, "send", pkt, sizeof(pkt->hdr) + seglen);
861 c->utcp->send(c->utcp, pkt, sizeof(pkt->hdr) + seglen);
864 // Calculate the new frame offset
865 while(c->frame_offset < seglen) {
867 buffer_copy(&c->sndbuf, &framelen, c->frame_offset, sizeof(framelen));
868 c->frame_offset += framelen + 2;
871 buffer_discard(&c->sndbuf, seglen);
872 c->frame_offset -= seglen;
877 // We sent one packet but we have partial data left, (re)start the flush timer
878 if(!timespec_isset(&c->rtrx_timeout)) {
879 c->flush_needed = true;
882 start_flush_timer(c);
884 // There is no partial data in the send buffer, so stop the flush timer
885 stop_retransmit_timer(c);
887 } else if(left && !timespec_isset(&c->rtrx_timeout)) {
888 // We have partial data and we didn't start the flush timer yet
889 c->flush_needed = true;
890 start_flush_timer(c);
894 static void flush_unreliable_framed(struct utcp_connection *c) {
895 int32_t left = seqdiff(c->snd.last, c->snd.nxt);
897 /* If the MSS dropped since last time ack_unreliable_frame() was called,
898 we might now have more than one segment worth of data left.
900 if(left > c->utcp->mss) {
901 ack_unreliable_framed(c);
902 left = seqdiff(c->snd.last, c->snd.nxt);
903 assert(left <= c->utcp->mss);
910 } *pkt = c->utcp->pkt;
912 pkt->hdr.src = c->src;
913 pkt->hdr.dst = c->dst;
914 pkt->hdr.seq = c->snd.nxt;
915 pkt->hdr.ack = c->rcv.nxt;
916 pkt->hdr.wnd = c->frame_offset;
917 pkt->hdr.ctl = ACK | MF;
920 uint32_t seglen = left;
922 buffer_copy(&c->sndbuf, pkt->data, seqdiff(c->snd.nxt, c->snd.una), seglen);
923 buffer_discard(&c->sndbuf, seglen);
925 c->snd.nxt += seglen;
926 c->snd.una = c->snd.nxt;
928 print_packet(c, "send", pkt, sizeof(pkt->hdr) + seglen);
929 c->utcp->send(c->utcp, pkt, sizeof(pkt->hdr) + seglen);
933 stop_retransmit_timer(c);
937 static ssize_t utcp_send_unreliable(struct utcp_connection *c, const void *data, size_t len) {
938 if(len > MAX_UNRELIABLE_SIZE) {
943 size_t rlen = len + (is_framed(c) ? 2 : 0);
945 if(rlen > buffer_free(&c->sndbuf)) {
946 if(rlen > c->sndbuf.maxsize) {
955 // Don't send anything yet if the connection has not fully established yet
957 if(c->state == SYN_SENT || c->state == SYN_RECEIVED) {
962 uint16_t framelen = len;
963 buffer_put(&c->sndbuf, &framelen, sizeof(framelen));
966 buffer_put(&c->sndbuf, data, len);
971 ack_unreliable_framed(c);
974 c->snd.una = c->snd.nxt = c->snd.last;
975 buffer_discard(&c->sndbuf, c->sndbuf.used);
981 ssize_t utcp_send(struct utcp_connection *c, const void *data, size_t len) {
983 debug(c, "send() called on closed connection\n");
991 debug(c, "send() called on unconnected connection\n");
1006 debug(c, "send() called on closed connection\n");
1016 if(is_reliable(c)) {
1017 return utcp_send_reliable(c, data, len);
1019 return utcp_send_unreliable(c, data, len);
1023 static void swap_ports(struct hdr *hdr) {
1024 uint16_t tmp = hdr->src;
1025 hdr->src = hdr->dst;
1029 static void fast_retransmit(struct utcp_connection *c) {
1030 if(c->state == CLOSED || c->snd.last == c->snd.una) {
1031 debug(c, "fast_retransmit() called but nothing to retransmit!\n");
1035 struct utcp *utcp = c->utcp;
1040 } *pkt = c->utcp->pkt;
1042 pkt->hdr.src = c->src;
1043 pkt->hdr.dst = c->dst;
1044 pkt->hdr.wnd = c->rcvbuf.maxsize;
1053 // Send unacked data again.
1054 pkt->hdr.seq = c->snd.una;
1055 pkt->hdr.ack = c->rcv.nxt;
1057 uint32_t len = min(seqdiff(c->snd.last, c->snd.una), utcp->mss);
1059 if(fin_wanted(c, c->snd.una + len)) {
1061 pkt->hdr.ctl |= FIN;
1064 buffer_copy(&c->sndbuf, pkt->data, 0, len);
1065 print_packet(c, "rtrx", pkt, sizeof(pkt->hdr) + len);
1066 utcp->send(utcp, pkt, sizeof(pkt->hdr) + len);
1074 static void retransmit(struct utcp_connection *c) {
1075 if(c->state == CLOSED || c->snd.last == c->snd.una) {
1076 debug(c, "retransmit() called but nothing to retransmit!\n");
1077 stop_retransmit_timer(c);
1081 struct utcp *utcp = c->utcp;
1083 if(utcp->retransmit && is_reliable(c)) {
1084 utcp->retransmit(c);
1090 } *pkt = c->utcp->pkt;
1092 pkt->hdr.src = c->src;
1093 pkt->hdr.dst = c->dst;
1094 pkt->hdr.wnd = c->rcvbuf.maxsize;
1099 // Send our SYN again
1100 pkt->hdr.seq = c->snd.iss;
1103 pkt->hdr.aux = 0x0101;
1107 pkt->data[3] = c->flags & 0x7;
1108 print_packet(c, "rtrx", pkt, sizeof(pkt->hdr) + 4);
1109 utcp->send(utcp, pkt, sizeof(pkt->hdr) + 4);
1113 // Send SYNACK again
1114 pkt->hdr.seq = c->snd.nxt;
1115 pkt->hdr.ack = c->rcv.nxt;
1116 pkt->hdr.ctl = SYN | ACK;
1117 print_packet(c, "rtrx", pkt, sizeof(pkt->hdr));
1118 utcp->send(utcp, pkt, sizeof(pkt->hdr));
1126 if(!is_reliable(c) && is_framed(c) && c->sndbuf.used) {
1127 flush_unreliable_framed(c);
1131 // Send unacked data again.
1132 pkt->hdr.seq = c->snd.una;
1133 pkt->hdr.ack = c->rcv.nxt;
1135 uint32_t len = min(seqdiff(c->snd.last, c->snd.una), utcp->mss);
1137 if(fin_wanted(c, c->snd.una + len)) {
1139 pkt->hdr.ctl |= FIN;
1142 // RFC 5681 slow start after timeout
1143 uint32_t flightsize = seqdiff(c->snd.nxt, c->snd.una);
1144 c->snd.ssthresh = max(flightsize / 2, utcp->mss * 2); // eq. 4
1145 c->snd.cwnd = utcp->mss;
1148 buffer_copy(&c->sndbuf, pkt->data, 0, len);
1149 print_packet(c, "rtrx", pkt, sizeof(pkt->hdr) + len);
1150 utcp->send(utcp, pkt, sizeof(pkt->hdr) + len);
1152 c->snd.nxt = c->snd.una + len;
1159 // We shouldn't need to retransmit anything in this state.
1163 stop_retransmit_timer(c);
1167 start_retransmit_timer(c);
1170 if(c->rto > MAX_RTO) {
1174 c->rtt_start.tv_sec = 0; // invalidate RTT timer
1175 c->dupack = 0; // cancel any ongoing fast recovery
1181 /* Update receive buffer and SACK entries after consuming data.
1185 * |.....0000..1111111111.....22222......3333|
1188 * 0..3 represent the SACK entries. The ^ indicates up to which point we want
1189 * to remove data from the receive buffer. The idea is to substract "len"
1190 * from the offset of all the SACK entries, and then remove/cut down entries
1191 * that are shifted to before the start of the receive buffer.
1193 * There are three cases:
1194 * - the SACK entry is after ^, in that case just change the offset.
1195 * - the SACK entry starts before and ends after ^, so we have to
1196 * change both its offset and size.
1197 * - the SACK entry is completely before ^, in that case delete it.
1199 static void sack_consume(struct utcp_connection *c, size_t len) {
1200 debug(c, "sack_consume %lu\n", (unsigned long)len);
1202 if(len > c->rcvbuf.used) {
1203 debug(c, "all SACK entries consumed\n");
1204 c->sacks[0].len = 0;
1208 buffer_discard(&c->rcvbuf, len);
1210 for(int i = 0; i < NSACKS && c->sacks[i].len;) {
1211 if(len < c->sacks[i].offset) {
1212 c->sacks[i].offset -= len;
1214 } else if(len < c->sacks[i].offset + c->sacks[i].len) {
1215 c->sacks[i].len -= len - c->sacks[i].offset;
1216 c->sacks[i].offset = 0;
1219 if(i < NSACKS - 1) {
1220 memmove(&c->sacks[i], &c->sacks[i + 1], (NSACKS - 1 - i) * sizeof(c->sacks)[i]);
1221 c->sacks[NSACKS - 1].len = 0;
1223 c->sacks[i].len = 0;
1229 for(int i = 0; i < NSACKS && c->sacks[i].len; i++) {
1230 debug(c, "SACK[%d] offset %u len %u\n", i, c->sacks[i].offset, c->sacks[i].len);
1234 static void handle_out_of_order(struct utcp_connection *c, uint32_t offset, const void *data, size_t len) {
1235 debug(c, "out of order packet, offset %u\n", offset);
1236 // Packet loss or reordering occured. Store the data in the buffer.
1237 ssize_t rxd = buffer_put_at(&c->rcvbuf, offset, data, len);
1240 debug(c, "packet outside receive buffer, dropping\n");
1244 if((size_t)rxd < len) {
1245 debug(c, "packet partially outside receive buffer\n");
1249 // Make note of where we put it.
1250 for(int i = 0; i < NSACKS; i++) {
1251 if(!c->sacks[i].len) { // nothing to merge, add new entry
1252 debug(c, "new SACK entry %d\n", i);
1253 c->sacks[i].offset = offset;
1254 c->sacks[i].len = rxd;
1256 } else if(offset < c->sacks[i].offset) {
1257 if(offset + rxd < c->sacks[i].offset) { // insert before
1258 if(!c->sacks[NSACKS - 1].len) { // only if room left
1259 debug(c, "insert SACK entry at %d\n", i);
1260 memmove(&c->sacks[i + 1], &c->sacks[i], (NSACKS - i - 1) * sizeof(c->sacks)[i]);
1261 c->sacks[i].offset = offset;
1262 c->sacks[i].len = rxd;
1264 debug(c, "SACK entries full, dropping packet\n");
1269 debug(c, "merge with start of SACK entry at %d\n", i);
1270 c->sacks[i].offset = offset;
1273 } else if(offset <= c->sacks[i].offset + c->sacks[i].len) {
1274 if(offset + rxd > c->sacks[i].offset + c->sacks[i].len) { // merge
1275 debug(c, "merge with end of SACK entry at %d\n", i);
1276 c->sacks[i].len = offset + rxd - c->sacks[i].offset;
1277 // TODO: handle potential merge with next entry
1284 for(int i = 0; i < NSACKS && c->sacks[i].len; i++) {
1285 debug(c, "SACK[%d] offset %u len %u\n", i, c->sacks[i].offset, c->sacks[i].len);
1289 static void handle_out_of_order_framed(struct utcp_connection *c, uint32_t offset, const void *data, size_t len) {
1290 uint32_t in_order_offset = (c->sacks[0].len && c->sacks[0].offset == 0) ? c->sacks[0].len : 0;
1292 // Put the data into the receive buffer
1293 handle_out_of_order(c, offset + in_order_offset, data, len);
1296 static void handle_in_order(struct utcp_connection *c, const void *data, size_t len) {
1298 ssize_t rxd = c->recv(c, data, len);
1300 if(rxd != (ssize_t)len) {
1301 // TODO: handle the application not accepting all data.
1306 // Check if we can process out-of-order data now.
1307 if(c->sacks[0].len && len >= c->sacks[0].offset) {
1308 debug(c, "incoming packet len %lu connected with SACK at %u\n", (unsigned long)len, c->sacks[0].offset);
1310 if(len < c->sacks[0].offset + c->sacks[0].len) {
1311 size_t offset = len;
1312 len = c->sacks[0].offset + c->sacks[0].len;
1313 size_t remainder = len - offset;
1315 ssize_t rxd = buffer_call(c, &c->rcvbuf, offset, remainder);
1317 if(rxd != (ssize_t)remainder) {
1318 // TODO: handle the application not accepting all data.
1324 if(c->rcvbuf.used) {
1325 sack_consume(c, len);
1331 static void handle_in_order_framed(struct utcp_connection *c, const void *data, size_t len) {
1332 // Treat it as out of order, since it is unlikely the start of this packet contains the start of a frame.
1333 uint32_t in_order_offset = (c->sacks[0].len && c->sacks[0].offset == 0) ? c->sacks[0].len : 0;
1334 handle_out_of_order(c, in_order_offset, data, len);
1336 // While we have full frames at the start, give them to the application
1337 while(c->sacks[0].len >= 2 && c->sacks[0].offset == 0) {
1339 buffer_copy(&c->rcvbuf, &framelen, 0, sizeof(&framelen));
1341 if(framelen > c->sacks[0].len - 2) {
1347 uint32_t realoffset = c->rcvbuf.offset + 2;
1349 if(c->rcvbuf.size - c->rcvbuf.offset <= 2) {
1350 realoffset -= c->rcvbuf.size;
1353 if(realoffset > c->rcvbuf.size - framelen) {
1354 // The buffer wraps, we need to copy
1355 uint8_t buf[framelen];
1356 buffer_copy(&c->rcvbuf, buf, 2, framelen);
1357 rxd = c->recv(c, buf, framelen);
1359 // The frame is contiguous in the receive buffer
1360 rxd = c->recv(c, c->rcvbuf.data + realoffset, framelen);
1363 if(rxd != (ssize_t)framelen) {
1364 // TODO: handle the application not accepting all data.
1369 sack_consume(c, framelen + 2);
1375 static void handle_unreliable(struct utcp_connection *c, const struct hdr *hdr, const void *data, size_t len) {
1376 // Fast path for unfragmented packets
1377 if(!hdr->wnd && !(hdr->ctl & MF)) {
1379 c->recv(c, data, len);
1382 c->rcv.nxt = hdr->seq + len;
1386 // Ensure reassembled packet are not larger than 64 kiB
1387 if(hdr->wnd > MAX_UNRELIABLE_SIZE || hdr->wnd + len > MAX_UNRELIABLE_SIZE) {
1391 // Don't accept out of order fragments
1392 if(hdr->wnd && hdr->seq != c->rcv.nxt) {
1396 // Reset the receive buffer for the first fragment
1398 buffer_clear(&c->rcvbuf);
1401 ssize_t rxd = buffer_put_at(&c->rcvbuf, hdr->wnd, data, len);
1403 if(rxd != (ssize_t)len) {
1407 // Send the packet if it's the final fragment
1408 if(!(hdr->ctl & MF)) {
1409 buffer_call(c, &c->rcvbuf, 0, hdr->wnd + len);
1412 c->rcv.nxt = hdr->seq + len;
1415 static void handle_unreliable_framed(struct utcp_connection *c, const struct hdr *hdr, const void *data, size_t len) {
1416 bool in_order = hdr->seq == c->rcv.nxt;
1417 c->rcv.nxt = hdr->seq + len;
1419 const uint8_t *ptr = data;
1422 // Does it start with a partial frame?
1424 // Only accept the data if it is in order
1425 if(in_order && c->rcvbuf.used) {
1426 // In order, append it to the receive buffer
1427 buffer_put(&c->rcvbuf, data, min(hdr->wnd, len));
1429 if(hdr->wnd <= len) {
1430 // We have a full frame
1431 c->recv(c, (uint8_t *)c->rcvbuf.data + 2, c->rcvbuf.used - 2);
1435 // Exit early if there is other data in this frame
1436 if(hdr->wnd > len) {
1438 buffer_clear(&c->rcvbuf);
1448 // We now start with new frames, so clear any data in the receive buffer
1449 buffer_clear(&c->rcvbuf);
1451 // Handle whole frames
1454 memcpy(&framelen, ptr, sizeof(framelen));
1456 if(left < (size_t)framelen + 2) {
1460 c->recv(c, ptr + 2, framelen);
1461 ptr += framelen + 2;
1462 left -= framelen + 2;
1465 // Handle partial last frame
1467 buffer_put(&c->rcvbuf, ptr, left);
1471 static void handle_incoming_data(struct utcp_connection *c, const struct hdr *hdr, const void *data, size_t len) {
1472 if(!is_reliable(c)) {
1474 handle_unreliable_framed(c, hdr, data, len);
1476 handle_unreliable(c, hdr, data, len);
1482 uint32_t offset = seqdiff(hdr->seq, c->rcv.nxt);
1486 handle_out_of_order_framed(c, offset, data, len);
1488 handle_in_order_framed(c, data, len);
1492 handle_out_of_order(c, offset, data, len);
1494 handle_in_order(c, data, len);
1500 ssize_t utcp_recv(struct utcp *utcp, const void *data, size_t len) {
1501 const uint8_t *ptr = data;
1517 // Drop packets smaller than the header
1521 if(len < sizeof(hdr)) {
1522 print_packet(NULL, "recv", data, len);
1527 // Make a copy from the potentially unaligned data to a struct hdr
1529 memcpy(&hdr, ptr, sizeof(hdr));
1531 // Try to match the packet to an existing connection
1533 struct utcp_connection *c = find_connection(utcp, hdr.dst, hdr.src);
1534 print_packet(c, "recv", data, len);
1536 // Process the header
1541 // Drop packets with an unknown CTL flag
1543 if(hdr.ctl & ~(SYN | ACK | RST | FIN | MF)) {
1544 print_packet(NULL, "recv", data, len);
1549 // Check for auxiliary headers
1551 const uint8_t *init = NULL;
1553 uint16_t aux = hdr.aux;
1556 size_t auxlen = 4 * (aux >> 8) & 0xf;
1557 uint8_t auxtype = aux & 0xff;
1566 if(!(hdr.ctl & SYN) || auxlen != 4) {
1582 if(!(aux & 0x800)) {
1591 memcpy(&aux, ptr, 2);
1596 bool has_data = len || (hdr.ctl & (SYN | FIN));
1598 // Is it for a new connection?
1601 // Ignore RST packets
1607 // Is it a SYN packet and are we LISTENing?
1609 if(hdr.ctl & SYN && !(hdr.ctl & ACK) && utcp->accept) {
1610 // If we don't want to accept it, send a RST back
1611 if((utcp->pre_accept && !utcp->pre_accept(utcp, hdr.dst))) {
1616 // Try to allocate memory, otherwise send a RST back
1617 c = allocate_connection(utcp, hdr.dst, hdr.src);
1624 // Parse auxilliary information
1631 c->flags = init[3] & 0x7;
1633 c->flags = UTCP_TCP;
1637 // Return SYN+ACK, go to SYN_RECEIVED state
1638 c->snd.wnd = hdr.wnd;
1639 c->rcv.irs = hdr.seq;
1640 c->rcv.nxt = c->rcv.irs + 1;
1641 set_state(c, SYN_RECEIVED);
1648 pkt.hdr.src = c->src;
1649 pkt.hdr.dst = c->dst;
1650 pkt.hdr.ack = c->rcv.irs + 1;
1651 pkt.hdr.seq = c->snd.iss;
1652 pkt.hdr.wnd = c->rcvbuf.maxsize;
1653 pkt.hdr.ctl = SYN | ACK;
1656 pkt.hdr.aux = 0x0101;
1660 pkt.data[3] = c->flags & 0x7;
1661 print_packet(c, "send", &pkt, sizeof(hdr) + 4);
1662 utcp->send(utcp, &pkt, sizeof(hdr) + 4);
1665 print_packet(c, "send", &pkt, sizeof(hdr));
1666 utcp->send(utcp, &pkt, sizeof(hdr));
1669 start_retransmit_timer(c);
1671 // No, we don't want your packets, send a RST back
1679 debug(c, "state %s\n", strstate[c->state]);
1681 // In case this is for a CLOSED connection, ignore the packet.
1682 // TODO: make it so incoming packets can never match a CLOSED connection.
1684 if(c->state == CLOSED) {
1685 debug(c, "got packet for closed connection\n");
1689 // It is for an existing connection.
1691 // 1. Drop invalid packets.
1693 // 1a. Drop packets that should not happen in our current state.
1714 // 1b. Discard data that is not in our receive window.
1716 if(is_reliable(c)) {
1719 if(c->state == SYN_SENT) {
1721 } else if(len == 0) {
1722 acceptable = seqdiff(hdr.seq, c->rcv.nxt) >= 0;
1724 int32_t rcv_offset = seqdiff(hdr.seq, c->rcv.nxt);
1726 // cut already accepted front overlapping
1727 if(rcv_offset < 0) {
1728 acceptable = len > (size_t) - rcv_offset;
1733 hdr.seq -= rcv_offset;
1736 acceptable = seqdiff(hdr.seq, c->rcv.nxt) >= 0 && seqdiff(hdr.seq, c->rcv.nxt) + len <= c->rcvbuf.maxsize;
1741 debug(c, "packet not acceptable, %u <= %u + %lu < %u\n", c->rcv.nxt, hdr.seq, (unsigned long)len, c->rcv.nxt + c->rcvbuf.maxsize);
1743 // Ignore unacceptable RST packets.
1748 // Otherwise, continue processing.
1753 int32_t rcv_offset = seqdiff(hdr.seq, c->rcv.nxt);
1756 debug(c, "packet out of order, offset %u bytes\n", rcv_offset);
1762 c->snd.wnd = hdr.wnd; // TODO: move below
1764 // 1c. Drop packets with an invalid ACK.
1765 // ackno should not roll back, and it should also not be bigger than what we ever could have sent
1766 // (= snd.una + c->sndbuf.used).
1768 if(!is_reliable(c)) {
1769 if(hdr.ack != c->snd.last && c->state >= ESTABLISHED) {
1770 hdr.ack = c->snd.una;
1774 if(hdr.ctl & ACK && (seqdiff(hdr.ack, c->snd.last) > 0 || seqdiff(hdr.ack, c->snd.una) < 0)) {
1775 debug(c, "packet ack seqno out of range, %u <= %u < %u\n", c->snd.una, hdr.ack, c->snd.una + c->sndbuf.used);
1777 // Ignore unacceptable RST packets.
1785 // 2. Handle RST packets
1790 if(!(hdr.ctl & ACK)) {
1794 // The peer has refused our connection.
1795 set_state(c, CLOSED);
1796 errno = ECONNREFUSED;
1799 c->recv(c, NULL, 0);
1802 if(c->poll && !c->reapable) {
1813 // We haven't told the application about this connection yet. Silently delete.
1825 // The peer has aborted our connection.
1826 set_state(c, CLOSED);
1830 c->recv(c, NULL, 0);
1833 if(c->poll && !c->reapable) {
1846 // As far as the application is concerned, the connection has already been closed.
1847 // If it has called utcp_close() already, we can immediately free this connection.
1853 // Otherwise, immediately move to the CLOSED state.
1854 set_state(c, CLOSED);
1867 if(!(hdr.ctl & ACK)) {
1872 // 3. Advance snd.una
1874 advanced = seqdiff(hdr.ack, c->snd.una);
1878 if(c->rtt_start.tv_sec) {
1879 if(c->rtt_seq == hdr.ack) {
1880 struct timespec now;
1881 clock_gettime(UTCP_CLOCK, &now);
1882 int32_t diff = timespec_diff_usec(&now, &c->rtt_start);
1883 update_rtt(c, diff);
1884 c->rtt_start.tv_sec = 0;
1885 } else if(c->rtt_seq < hdr.ack) {
1886 debug(c, "cancelling RTT measurement: %u < %u\n", c->rtt_seq, hdr.ack);
1887 c->rtt_start.tv_sec = 0;
1891 int32_t data_acked = advanced;
1899 // TODO: handle FIN as well.
1904 assert(data_acked >= 0);
1907 int32_t bufused = seqdiff(c->snd.last, c->snd.una);
1908 assert(data_acked <= bufused);
1912 buffer_discard(&c->sndbuf, data_acked);
1914 if(is_reliable(c)) {
1919 // Also advance snd.nxt if possible
1920 if(seqdiff(c->snd.nxt, hdr.ack) < 0) {
1921 c->snd.nxt = hdr.ack;
1924 c->snd.una = hdr.ack;
1927 if(c->dupack >= 3) {
1928 debug(c, "fast recovery ended\n");
1929 c->snd.cwnd = c->snd.ssthresh;
1935 // Increase the congestion window according to RFC 5681
1936 if(c->snd.cwnd < c->snd.ssthresh) {
1937 c->snd.cwnd += min(advanced, utcp->mss); // eq. 2
1939 c->snd.cwnd += max(1, (utcp->mss * utcp->mss) / c->snd.cwnd); // eq. 3
1942 if(c->snd.cwnd > c->sndbuf.maxsize) {
1943 c->snd.cwnd = c->sndbuf.maxsize;
1948 // Check if we have sent a FIN that is now ACKed.
1951 if(c->snd.una == c->snd.last) {
1952 set_state(c, FIN_WAIT_2);
1958 if(c->snd.una == c->snd.last) {
1959 clock_gettime(UTCP_CLOCK, &c->conn_timeout);
1960 c->conn_timeout.tv_sec += utcp->timeout;
1961 set_state(c, TIME_WAIT);
1970 if(!len && is_reliable(c) && c->snd.una != c->snd.last) {
1972 debug(c, "duplicate ACK %d\n", c->dupack);
1974 if(c->dupack == 3) {
1975 // RFC 5681 fast recovery
1976 debug(c, "fast recovery started\n", c->dupack);
1977 uint32_t flightsize = seqdiff(c->snd.nxt, c->snd.una);
1978 c->snd.ssthresh = max(flightsize / 2, utcp->mss * 2); // eq. 4
1979 c->snd.cwnd = min(c->snd.ssthresh + 3 * utcp->mss, c->sndbuf.maxsize);
1981 if(c->snd.cwnd > c->sndbuf.maxsize) {
1982 c->snd.cwnd = c->sndbuf.maxsize;
1988 } else if(c->dupack > 3) {
1989 c->snd.cwnd += utcp->mss;
1991 if(c->snd.cwnd > c->sndbuf.maxsize) {
1992 c->snd.cwnd = c->sndbuf.maxsize;
1998 // We got an ACK which indicates the other side did get one of our packets.
1999 // Reset the retransmission timer to avoid going to slow start,
2000 // but don't touch the connection timeout.
2001 start_retransmit_timer(c);
2008 if(c->snd.una == c->snd.last) {
2009 stop_retransmit_timer(c);
2010 timespec_clear(&c->conn_timeout);
2011 } else if(is_reliable(c)) {
2012 start_retransmit_timer(c);
2013 clock_gettime(UTCP_CLOCK, &c->conn_timeout);
2014 c->conn_timeout.tv_sec += utcp->timeout;
2019 // 5. Process SYN stuff
2025 // This is a SYNACK. It should always have ACKed the SYN.
2030 c->rcv.irs = hdr.seq;
2031 c->rcv.nxt = hdr.seq + 1;
2035 set_state(c, FIN_WAIT_1);
2038 set_state(c, ESTABLISHED);
2044 // This is a retransmit of a SYN, send back the SYNACK.
2054 // This could be a retransmission. Ignore the SYN flag, but send an ACK back.
2065 // 6. Process new data
2067 if(c->state == SYN_RECEIVED) {
2068 // This is the ACK after the SYNACK. It should always have ACKed the SYNACK.
2073 // Are we still LISTENing?
2075 utcp->accept(c, c->src);
2078 if(c->state != ESTABLISHED) {
2079 set_state(c, CLOSED);
2089 // This should never happen.
2104 // Ehm no, We should never receive more data after a FIN.
2114 handle_incoming_data(c, &hdr, ptr, len);
2117 // 7. Process FIN stuff
2119 if((hdr.ctl & FIN) && (!is_reliable(c) || hdr.seq + len == c->rcv.nxt)) {
2123 // This should never happen.
2130 set_state(c, CLOSE_WAIT);
2134 set_state(c, CLOSING);
2138 clock_gettime(UTCP_CLOCK, &c->conn_timeout);
2139 c->conn_timeout.tv_sec += utcp->timeout;
2140 set_state(c, TIME_WAIT);
2147 // Ehm, no. We should never receive a second FIN.
2157 // FIN counts as one sequence number
2161 // Inform the application that the peer closed its end of the connection.
2164 c->recv(c, NULL, 0);
2168 // Now we send something back if:
2169 // - we received data, so we have to send back an ACK
2170 // -> sendatleastone = true
2171 // - or we got an ack, so we should maybe send a bit more data
2172 // -> sendatleastone = false
2174 if(is_reliable(c) || hdr.ctl & SYN || hdr.ctl & FIN) {
2189 hdr.ack = hdr.seq + len;
2191 hdr.ctl = RST | ACK;
2194 print_packet(c, "send", &hdr, sizeof(hdr));
2195 utcp->send(utcp, &hdr, sizeof(hdr));
2200 int utcp_shutdown(struct utcp_connection *c, int dir) {
2201 debug(c, "shutdown %d at %u\n", dir, c ? c->snd.last : 0);
2209 debug(c, "shutdown() called on closed connection\n");
2214 if(!(dir == UTCP_SHUT_RD || dir == UTCP_SHUT_WR || dir == UTCP_SHUT_RDWR)) {
2219 // TCP does not have a provision for stopping incoming packets.
2220 // The best we can do is to just ignore them.
2221 if(dir == UTCP_SHUT_RD || dir == UTCP_SHUT_RDWR) {
2225 // The rest of the code deals with shutting down writes.
2226 if(dir == UTCP_SHUT_RD) {
2230 // Only process shutting down writes once.
2248 if(!is_reliable(c) && is_framed(c)) {
2249 flush_unreliable_framed(c);
2252 set_state(c, FIN_WAIT_1);
2260 set_state(c, CLOSING);
2271 ack(c, !is_reliable(c));
2273 if(!timespec_isset(&c->rtrx_timeout)) {
2274 start_retransmit_timer(c);
2280 static bool reset_connection(struct utcp_connection *c) {
2287 debug(c, "abort() called on closed connection\n");
2304 set_state(c, CLOSED);
2312 set_state(c, CLOSED);
2322 hdr.seq = c->snd.nxt;
2327 print_packet(c, "send", &hdr, sizeof(hdr));
2328 c->utcp->send(c->utcp, &hdr, sizeof(hdr));
2332 // Closes all the opened connections
2333 void utcp_abort_all_connections(struct utcp *utcp) {
2339 for(int i = 0; i < utcp->nconnections; i++) {
2340 struct utcp_connection *c = utcp->connections[i];
2342 if(c->reapable || c->state == CLOSED) {
2346 utcp_recv_t old_recv = c->recv;
2347 utcp_poll_t old_poll = c->poll;
2349 reset_connection(c);
2353 old_recv(c, NULL, 0);
2356 if(old_poll && !c->reapable) {
2365 int utcp_close(struct utcp_connection *c) {
2366 debug(c, "closing\n");
2368 if(c->rcvbuf.used) {
2369 debug(c, "receive buffer not empty, resetting\n");
2370 return reset_connection(c) ? 0 : -1;
2373 if(utcp_shutdown(c, SHUT_RDWR) && errno != ENOTCONN) {
2383 int utcp_abort(struct utcp_connection *c) {
2384 if(!reset_connection(c)) {
2393 * One call to this function will loop through all connections,
2394 * checking if something needs to be resent or not.
2395 * The return value is the time to the next timeout in milliseconds,
2396 * or maybe a negative value if the timeout is infinite.
2398 struct timespec utcp_timeout(struct utcp *utcp) {
2399 struct timespec now;
2400 clock_gettime(UTCP_CLOCK, &now);
2401 struct timespec next = {now.tv_sec + 3600, now.tv_nsec};
2403 for(int i = 0; i < utcp->nconnections; i++) {
2404 struct utcp_connection *c = utcp->connections[i];
2410 // delete connections that have been utcp_close()d.
2411 if(c->state == CLOSED) {
2413 debug(c, "reaping\n");
2421 if(timespec_isset(&c->conn_timeout) && timespec_lt(&c->conn_timeout, &now)) {
2426 c->recv(c, NULL, 0);
2429 if(c->poll && !c->reapable) {
2436 if(timespec_isset(&c->rtrx_timeout) && timespec_lt(&c->rtrx_timeout, &now)) {
2437 debug(c, "retransmitting after timeout\n");
2442 if((c->state == ESTABLISHED || c->state == CLOSE_WAIT) && c->do_poll) {
2444 uint32_t len = is_framed(c) ? min(buffer_free(&c->sndbuf), MAX_UNRELIABLE_SIZE) : buffer_free(&c->sndbuf);
2449 } else if(c->state == CLOSED) {
2454 if(timespec_isset(&c->conn_timeout) && timespec_lt(&c->conn_timeout, &next)) {
2455 next = c->conn_timeout;
2458 if(timespec_isset(&c->rtrx_timeout) && timespec_lt(&c->rtrx_timeout, &next)) {
2459 next = c->rtrx_timeout;
2463 struct timespec diff;
2465 timespec_sub(&next, &now, &diff);
2470 bool utcp_is_active(struct utcp *utcp) {
2475 for(int i = 0; i < utcp->nconnections; i++)
2476 if(utcp->connections[i]->state != CLOSED && utcp->connections[i]->state != TIME_WAIT) {
2483 struct utcp *utcp_init(utcp_accept_t accept, utcp_pre_accept_t pre_accept, utcp_send_t send, void *priv) {
2489 struct utcp *utcp = calloc(1, sizeof(*utcp));
2495 utcp_set_mtu(utcp, DEFAULT_MTU);
2502 if(!CLOCK_GRANULARITY) {
2503 struct timespec res;
2504 clock_getres(UTCP_CLOCK, &res);
2505 CLOCK_GRANULARITY = res.tv_sec * USEC_PER_SEC + res.tv_nsec / 1000;
2508 utcp->accept = accept;
2509 utcp->pre_accept = pre_accept;
2512 utcp->timeout = DEFAULT_USER_TIMEOUT; // sec
2517 void utcp_exit(struct utcp *utcp) {
2522 for(int i = 0; i < utcp->nconnections; i++) {
2523 struct utcp_connection *c = utcp->connections[i];
2527 c->recv(c, NULL, 0);
2530 if(c->poll && !c->reapable) {
2535 buffer_exit(&c->rcvbuf);
2536 buffer_exit(&c->sndbuf);
2540 free(utcp->connections);
2545 uint16_t utcp_get_mtu(struct utcp *utcp) {
2546 return utcp ? utcp->mtu : 0;
2549 uint16_t utcp_get_mss(struct utcp *utcp) {
2550 return utcp ? utcp->mss : 0;
2553 void utcp_set_mtu(struct utcp *utcp, uint16_t mtu) {
2558 if(mtu <= sizeof(struct hdr)) {
2562 if(mtu > utcp->mtu) {
2563 char *new = realloc(utcp->pkt, mtu + sizeof(struct hdr));
2573 utcp->mss = mtu - sizeof(struct hdr);
2576 void utcp_reset_timers(struct utcp *utcp) {
2581 struct timespec now, then;
2583 clock_gettime(UTCP_CLOCK, &now);
2587 then.tv_sec += utcp->timeout;
2589 for(int i = 0; i < utcp->nconnections; i++) {
2590 struct utcp_connection *c = utcp->connections[i];
2596 if(timespec_isset(&c->rtrx_timeout)) {
2597 c->rtrx_timeout = now;
2600 if(timespec_isset(&c->conn_timeout)) {
2601 c->conn_timeout = then;
2604 c->rtt_start.tv_sec = 0;
2606 if(c->rto > START_RTO) {
2612 int utcp_get_user_timeout(struct utcp *u) {
2613 return u ? u->timeout : 0;
2616 void utcp_set_user_timeout(struct utcp *u, int timeout) {
2618 u->timeout = timeout;
2622 size_t utcp_get_sndbuf(struct utcp_connection *c) {
2623 return c ? c->sndbuf.maxsize : 0;
2626 size_t utcp_get_sndbuf_free(struct utcp_connection *c) {
2636 return buffer_free(&c->sndbuf);
2643 void utcp_set_sndbuf(struct utcp_connection *c, size_t size) {
2648 c->sndbuf.maxsize = size;
2650 if(c->sndbuf.maxsize != size) {
2651 c->sndbuf.maxsize = -1;
2654 c->do_poll = is_reliable(c) && buffer_free(&c->sndbuf);
2657 size_t utcp_get_rcvbuf(struct utcp_connection *c) {
2658 return c ? c->rcvbuf.maxsize : 0;
2661 size_t utcp_get_rcvbuf_free(struct utcp_connection *c) {
2662 if(c && (c->state == ESTABLISHED || c->state == CLOSE_WAIT)) {
2663 return buffer_free(&c->rcvbuf);
2669 void utcp_set_rcvbuf(struct utcp_connection *c, size_t size) {
2674 c->rcvbuf.maxsize = size;
2676 if(c->rcvbuf.maxsize != size) {
2677 c->rcvbuf.maxsize = -1;
2681 size_t utcp_get_sendq(struct utcp_connection *c) {
2682 return c->sndbuf.used;
2685 size_t utcp_get_recvq(struct utcp_connection *c) {
2686 return c->rcvbuf.used;
2689 bool utcp_get_nodelay(struct utcp_connection *c) {
2690 return c ? c->nodelay : false;
2693 void utcp_set_nodelay(struct utcp_connection *c, bool nodelay) {
2695 c->nodelay = nodelay;
2699 bool utcp_get_keepalive(struct utcp_connection *c) {
2700 return c ? c->keepalive : false;
2703 void utcp_set_keepalive(struct utcp_connection *c, bool keepalive) {
2705 c->keepalive = keepalive;
2709 size_t utcp_get_outq(struct utcp_connection *c) {
2710 return c ? seqdiff(c->snd.nxt, c->snd.una) : 0;
2713 void utcp_set_recv_cb(struct utcp_connection *c, utcp_recv_t recv) {
2719 void utcp_set_poll_cb(struct utcp_connection *c, utcp_poll_t poll) {
2722 c->do_poll = is_reliable(c) && buffer_free(&c->sndbuf);
2726 void utcp_set_accept_cb(struct utcp *utcp, utcp_accept_t accept, utcp_pre_accept_t pre_accept) {
2728 utcp->accept = accept;
2729 utcp->pre_accept = pre_accept;
2733 void utcp_expect_data(struct utcp_connection *c, bool expect) {
2734 if(!c || c->reapable) {
2738 if(!(c->state == ESTABLISHED || c->state == FIN_WAIT_1 || c->state == FIN_WAIT_2)) {
2743 // If we expect data, start the connection timer.
2744 if(!timespec_isset(&c->conn_timeout)) {
2745 clock_gettime(UTCP_CLOCK, &c->conn_timeout);
2746 c->conn_timeout.tv_sec += c->utcp->timeout;
2749 // If we want to cancel expecting data, only clear the timer when there is no unACKed data.
2750 if(c->snd.una == c->snd.last) {
2751 timespec_clear(&c->conn_timeout);
2756 void utcp_offline(struct utcp *utcp, bool offline) {
2757 struct timespec now;
2758 clock_gettime(UTCP_CLOCK, &now);
2760 for(int i = 0; i < utcp->nconnections; i++) {
2761 struct utcp_connection *c = utcp->connections[i];
2767 utcp_expect_data(c, offline);
2770 if(timespec_isset(&c->rtrx_timeout)) {
2771 c->rtrx_timeout = now;
2774 utcp->connections[i]->rtt_start.tv_sec = 0;
2776 if(c->rto > START_RTO) {
2783 void utcp_set_retransmit_cb(struct utcp *utcp, utcp_retransmit_t cb) {
2784 utcp->retransmit = cb;
2787 void utcp_set_clock_granularity(long granularity) {
2788 CLOCK_GRANULARITY = granularity;
2791 int utcp_get_flush_timeout(struct utcp *utcp) {
2792 return utcp->flush_timeout;
2795 void utcp_set_flush_timeout(struct utcp *utcp, int milliseconds) {
2796 utcp->flush_timeout = milliseconds;
2799 bool utcp_get_flush_needed(struct utcp_connection *c) {
2800 bool value = c->flush_needed;
2801 c->flush_needed = false;