2 utcp.c -- Userspace TCP
3 Copyright (C) 2014-2017 Guus Sliepen <guus@tinc-vpn.org>
5 This program is free software; you can redistribute it and/or modify
6 it under the terms of the GNU General Public License as published by
7 the Free Software Foundation; either version 2 of the License, or
8 (at your option) any later version.
10 This program is distributed in the hope that it will be useful,
11 but WITHOUT ANY WARRANTY; without even the implied warranty of
12 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13 GNU General Public License for more details.
15 You should have received a copy of the GNU General Public License along
16 with this program; if not, write to the Free Software Foundation, Inc.,
17 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
32 #include "utcp_priv.h"
47 #if defined(CLOCK_MONOTONIC_RAW) && defined(__x86_64__)
48 #define UTCP_CLOCK CLOCK_MONOTONIC_RAW
50 #define UTCP_CLOCK CLOCK_MONOTONIC
54 static void timespec_sub(const struct timespec *a, const struct timespec *b, struct timespec *r) {
55 r->tv_sec = a->tv_sec - b->tv_sec;
56 r->tv_nsec = a->tv_nsec - b->tv_nsec;
59 r->tv_sec--, r->tv_nsec += NSEC_PER_SEC;
63 static int32_t timespec_diff_usec(const struct timespec *a, const struct timespec *b) {
64 return (a->tv_sec - b->tv_sec) * 1000000 + (a->tv_nsec - b->tv_nsec) / 1000;
67 static bool timespec_lt(const struct timespec *a, const struct timespec *b) {
68 if(a->tv_sec == b->tv_sec) {
69 return a->tv_nsec < b->tv_nsec;
71 return a->tv_sec < b->tv_sec;
75 static void timespec_clear(struct timespec *a) {
80 static bool timespec_isset(const struct timespec *a) {
84 static long CLOCK_GRANULARITY; // usec
86 static inline size_t min(size_t a, size_t b) {
90 static inline size_t max(size_t a, size_t b) {
97 #ifndef UTCP_DEBUG_DATALEN
98 #define UTCP_DEBUG_DATALEN 20
101 static void debug(struct utcp_connection *c, const char *format, ...) {
106 clock_gettime(CLOCK_REALTIME, &tv);
107 len = snprintf(buf, sizeof(buf), "%ld.%06lu %u:%u ", (long)tv.tv_sec, tv.tv_nsec / 1000, c ? c->src : 0, c ? c->dst : 0);
109 va_start(ap, format);
110 len += vsnprintf(buf + len, sizeof(buf) - len, format, ap);
113 if(len > 0 && (size_t)len < sizeof(buf)) {
114 fwrite(buf, len, 1, stderr);
118 static void print_packet(struct utcp_connection *c, const char *dir, const void *pkt, size_t len) {
121 if(len < sizeof(hdr)) {
122 debug(c, "%s: short packet (%lu bytes)\n", dir, (unsigned long)len);
126 memcpy(&hdr, pkt, sizeof(hdr));
130 if(len > sizeof(hdr)) {
131 datalen = min(len - sizeof(hdr), UTCP_DEBUG_DATALEN);
137 const uint8_t *data = (uint8_t *)pkt + sizeof(hdr);
138 char str[datalen * 2 + 1];
141 for(uint32_t i = 0; i < datalen; i++) {
142 *p++ = "0123456789ABCDEF"[data[i] >> 4];
143 *p++ = "0123456789ABCDEF"[data[i] & 15];
148 debug(c, "%s: len %lu src %u dst %u seq %u ack %u wnd %u aux %x ctl %s%s%s%s%s data %s\n",
149 dir, (unsigned long)len, hdr.src, hdr.dst, hdr.seq, hdr.ack, hdr.wnd, hdr.aux,
150 hdr.ctl & SYN ? "SYN" : "",
151 hdr.ctl & RST ? "RST" : "",
152 hdr.ctl & FIN ? "FIN" : "",
153 hdr.ctl & ACK ? "ACK" : "",
154 hdr.ctl & MF ? "MF" : "",
159 static void debug_cwnd(struct utcp_connection *c) {
160 debug(c, "snd.cwnd %u snd.ssthresh %u\n", c->snd.cwnd, ~c->snd.ssthresh ? c->snd.ssthresh : 0);
163 #define debug(...) do {} while(0)
164 #define print_packet(...) do {} while(0)
165 #define debug_cwnd(...) do {} while(0)
168 static void set_state(struct utcp_connection *c, enum state state) {
171 if(state == ESTABLISHED) {
172 timespec_clear(&c->conn_timeout);
175 debug(c, "state %s\n", strstate[state]);
178 static bool fin_wanted(struct utcp_connection *c, uint32_t seq) {
179 if(seq != c->snd.last) {
194 static bool is_reliable(struct utcp_connection *c) {
195 return c->flags & UTCP_RELIABLE;
198 static bool is_framed(struct utcp_connection *c) {
199 return c->flags & UTCP_FRAMED;
202 static int32_t seqdiff(uint32_t a, uint32_t b) {
207 static bool buffer_wraps(struct buffer *buf) {
208 return buf->size - buf->offset < buf->used;
211 static bool buffer_resize(struct buffer *buf, uint32_t newsize) {
212 char *newdata = realloc(buf->data, newsize);
220 if(buffer_wraps(buf)) {
221 // Shift the right part of the buffer until it hits the end of the new buffer.
225 // [345.........|........012]
226 uint32_t tailsize = buf->size - buf->offset;
227 uint32_t newoffset = newsize - tailsize;
228 memmove(buf->data + newoffset, buf->data + buf->offset, tailsize);
229 buf->offset = newoffset;
236 // Store data into the buffer
237 static ssize_t buffer_put_at(struct buffer *buf, size_t offset, const void *data, size_t len) {
238 debug(NULL, "buffer_put_at %lu %lu %lu\n", (unsigned long)buf->used, (unsigned long)offset, (unsigned long)len);
240 // Ensure we don't store more than maxsize bytes in total
241 size_t required = offset + len;
243 if(required > buf->maxsize) {
244 if(offset >= buf->maxsize) {
248 len = buf->maxsize - offset;
249 required = buf->maxsize;
252 // Check if we need to resize the buffer
253 if(required > buf->size) {
254 size_t newsize = buf->size;
262 } while(newsize < required);
264 if(newsize > buf->maxsize) {
265 newsize = buf->maxsize;
268 if(!buffer_resize(buf, newsize)) {
273 uint32_t realoffset = buf->offset + offset;
275 if(buf->size - buf->offset <= offset) {
276 // The offset wrapped
277 realoffset -= buf->size;
280 if(buf->size - realoffset < len) {
281 // The new chunk of data must be wrapped
282 memcpy(buf->data + realoffset, data, buf->size - realoffset);
283 memcpy(buf->data, (char *)data + buf->size - realoffset, len - (buf->size - realoffset));
285 memcpy(buf->data + realoffset, data, len);
288 if(required > buf->used) {
289 buf->used = required;
295 static ssize_t buffer_put(struct buffer *buf, const void *data, size_t len) {
296 return buffer_put_at(buf, buf->used, data, len);
299 // Copy data from the buffer without removing it.
300 static ssize_t buffer_copy(struct buffer *buf, void *data, size_t offset, size_t len) {
301 // Ensure we don't copy more than is actually stored in the buffer
302 if(offset >= buf->used) {
306 if(buf->used - offset < len) {
307 len = buf->used - offset;
310 uint32_t realoffset = buf->offset + offset;
312 if(buf->size - buf->offset <= offset) {
313 // The offset wrapped
314 realoffset -= buf->size;
317 if(buf->size - realoffset < len) {
318 // The data is wrapped
319 memcpy(data, buf->data + realoffset, buf->size - realoffset);
320 memcpy((char *)data + buf->size - realoffset, buf->data, len - (buf->size - realoffset));
322 memcpy(data, buf->data + realoffset, len);
328 // Copy data from the buffer without removing it.
329 static ssize_t buffer_call(struct utcp_connection *c, struct buffer *buf, size_t offset, size_t len) {
334 // Ensure we don't copy more than is actually stored in the buffer
335 if(offset >= buf->used) {
339 if(buf->used - offset < len) {
340 len = buf->used - offset;
343 uint32_t realoffset = buf->offset + offset;
345 if(buf->size - buf->offset <= offset) {
346 // The offset wrapped
347 realoffset -= buf->size;
350 if(buf->size - realoffset < len) {
351 // The data is wrapped
352 ssize_t rx1 = c->recv(c, buf->data + realoffset, buf->size - realoffset);
354 if(rx1 < buf->size - realoffset) {
358 // The channel might have been closed by the previous callback
363 ssize_t rx2 = c->recv(c, buf->data, len - (buf->size - realoffset));
371 return c->recv(c, buf->data + realoffset, len);
375 // Discard data from the buffer.
376 static ssize_t buffer_discard(struct buffer *buf, size_t len) {
377 if(buf->used < len) {
381 if(buf->size - buf->offset <= len) {
382 buf->offset -= buf->size;
385 if(buf->used == len) {
396 static void buffer_clear(struct buffer *buf) {
401 static bool buffer_set_size(struct buffer *buf, uint32_t minsize, uint32_t maxsize) {
402 if(maxsize < minsize) {
406 buf->maxsize = maxsize;
408 return buf->size >= minsize || buffer_resize(buf, minsize);
411 static void buffer_exit(struct buffer *buf) {
413 memset(buf, 0, sizeof(*buf));
416 static uint32_t buffer_free(const struct buffer *buf) {
417 return buf->maxsize > buf->used ? buf->maxsize - buf->used : 0;
420 // Connections are stored in a sorted list.
421 // This gives O(log(N)) lookup time, O(N log(N)) insertion time and O(N) deletion time.
423 static int compare(const void *va, const void *vb) {
426 const struct utcp_connection *a = *(struct utcp_connection **)va;
427 const struct utcp_connection *b = *(struct utcp_connection **)vb;
430 assert(a->src && b->src);
432 int c = (int)a->src - (int)b->src;
438 c = (int)a->dst - (int)b->dst;
442 static struct utcp_connection *find_connection(const struct utcp *utcp, uint16_t src, uint16_t dst) {
443 if(!utcp->nconnections) {
447 struct utcp_connection key = {
451 struct utcp_connection **match = bsearch(&keyp, utcp->connections, utcp->nconnections, sizeof(*utcp->connections), compare);
452 return match ? *match : NULL;
455 static void free_connection(struct utcp_connection *c) {
456 struct utcp *utcp = c->utcp;
457 struct utcp_connection **cp = bsearch(&c, utcp->connections, utcp->nconnections, sizeof(*utcp->connections), compare);
461 int i = cp - utcp->connections;
462 memmove(cp, cp + 1, (utcp->nconnections - i - 1) * sizeof(*cp));
463 utcp->nconnections--;
465 buffer_exit(&c->rcvbuf);
466 buffer_exit(&c->sndbuf);
470 static struct utcp_connection *allocate_connection(struct utcp *utcp, uint16_t src, uint16_t dst) {
471 // Check whether this combination of src and dst is free
474 if(find_connection(utcp, src, dst)) {
478 } else { // If src == 0, generate a random port number with the high bit set
479 if(utcp->nconnections >= 32767) {
484 src = rand() | 0x8000;
486 while(find_connection(utcp, src, dst)) {
491 // Allocate memory for the new connection
493 if(utcp->nconnections >= utcp->nallocated) {
494 if(!utcp->nallocated) {
495 utcp->nallocated = 4;
497 utcp->nallocated *= 2;
500 struct utcp_connection **new_array = realloc(utcp->connections, utcp->nallocated * sizeof(*utcp->connections));
506 utcp->connections = new_array;
509 struct utcp_connection *c = calloc(1, sizeof(*c));
515 if(!buffer_set_size(&c->sndbuf, DEFAULT_SNDBUFSIZE, DEFAULT_MAXSNDBUFSIZE)) {
520 if(!buffer_set_size(&c->rcvbuf, DEFAULT_RCVBUFSIZE, DEFAULT_MAXRCVBUFSIZE)) {
521 buffer_exit(&c->sndbuf);
526 // Fill in the details
535 c->snd.una = c->snd.iss;
536 c->snd.nxt = c->snd.iss + 1;
537 c->snd.last = c->snd.nxt;
538 c->snd.cwnd = (utcp->mss > 2190 ? 2 : utcp->mss > 1095 ? 3 : 4) * utcp->mss;
539 c->snd.ssthresh = ~0;
546 // Add it to the sorted list of connections
548 utcp->connections[utcp->nconnections++] = c;
549 qsort(utcp->connections, utcp->nconnections, sizeof(*utcp->connections), compare);
554 static inline uint32_t absdiff(uint32_t a, uint32_t b) {
562 // Update RTT variables. See RFC 6298.
563 static void update_rtt(struct utcp_connection *c, uint32_t rtt) {
565 debug(c, "invalid rtt\n");
573 c->rttvar = (c->rttvar * 3 + absdiff(c->srtt, rtt)) / 4;
574 c->srtt = (c->srtt * 7 + rtt) / 8;
577 c->rto = c->srtt + max(4 * c->rttvar, CLOCK_GRANULARITY);
579 if(c->rto > MAX_RTO) {
583 debug(c, "rtt %u srtt %u rttvar %u rto %u\n", rtt, c->srtt, c->rttvar, c->rto);
586 static void start_retransmit_timer(struct utcp_connection *c) {
587 clock_gettime(UTCP_CLOCK, &c->rtrx_timeout);
589 uint32_t rto = c->rto;
591 while(rto > USEC_PER_SEC) {
592 c->rtrx_timeout.tv_sec++;
596 c->rtrx_timeout.tv_nsec += rto * 1000;
598 if(c->rtrx_timeout.tv_nsec >= NSEC_PER_SEC) {
599 c->rtrx_timeout.tv_nsec -= NSEC_PER_SEC;
600 c->rtrx_timeout.tv_sec++;
603 debug(c, "rtrx_timeout %ld.%06lu\n", c->rtrx_timeout.tv_sec, c->rtrx_timeout.tv_nsec);
606 static void start_flush_timer(struct utcp_connection *c) {
607 clock_gettime(UTCP_CLOCK, &c->rtrx_timeout);
609 c->rtrx_timeout.tv_nsec += c->utcp->flush_timeout * 1000000;
611 if(c->rtrx_timeout.tv_nsec >= NSEC_PER_SEC) {
612 c->rtrx_timeout.tv_nsec -= NSEC_PER_SEC;
613 c->rtrx_timeout.tv_sec++;
616 debug(c, "rtrx_timeout %ld.%06lu (flush)\n", c->rtrx_timeout.tv_sec, c->rtrx_timeout.tv_nsec);
619 static void stop_retransmit_timer(struct utcp_connection *c) {
620 timespec_clear(&c->rtrx_timeout);
621 debug(c, "rtrx_timeout cleared\n");
624 struct utcp_connection *utcp_connect_ex(struct utcp *utcp, uint16_t dst, utcp_recv_t recv, void *priv, uint32_t flags) {
625 struct utcp_connection *c = allocate_connection(utcp, 0, dst);
631 assert((flags & ~0x1f) == 0);
642 pkt.hdr.src = c->src;
643 pkt.hdr.dst = c->dst;
644 pkt.hdr.seq = c->snd.iss;
646 pkt.hdr.wnd = c->rcvbuf.maxsize;
648 pkt.hdr.aux = 0x0101;
652 pkt.init[3] = flags & 0x7;
654 set_state(c, SYN_SENT);
656 print_packet(c, "send", &pkt, sizeof(pkt));
657 utcp->send(utcp, &pkt, sizeof(pkt));
659 clock_gettime(UTCP_CLOCK, &c->conn_timeout);
660 c->conn_timeout.tv_sec += utcp->timeout;
662 start_retransmit_timer(c);
667 struct utcp_connection *utcp_connect(struct utcp *utcp, uint16_t dst, utcp_recv_t recv, void *priv) {
668 return utcp_connect_ex(utcp, dst, recv, priv, UTCP_TCP);
671 void utcp_accept(struct utcp_connection *c, utcp_recv_t recv, void *priv) {
672 if(c->reapable || c->state != SYN_RECEIVED) {
673 debug(c, "accept() called on invalid connection in state %s\n", c, strstate[c->state]);
677 debug(c, "accepted %p %p\n", c, recv, priv);
681 set_state(c, ESTABLISHED);
684 static void ack(struct utcp_connection *c, bool sendatleastone) {
685 int32_t left = seqdiff(c->snd.last, c->snd.nxt);
686 int32_t cwndleft = is_reliable(c) ? min(c->snd.cwnd, c->snd.wnd) - seqdiff(c->snd.nxt, c->snd.una) : MAX_UNRELIABLE_SIZE;
692 } else if(cwndleft < left) {
695 if(!sendatleastone || cwndleft > c->utcp->mss) {
696 left -= left % c->utcp->mss;
700 debug(c, "cwndleft %d left %d\n", cwndleft, left);
702 if(!left && !sendatleastone) {
709 } *pkt = c->utcp->pkt;
711 pkt->hdr.src = c->src;
712 pkt->hdr.dst = c->dst;
713 pkt->hdr.ack = c->rcv.nxt;
714 pkt->hdr.wnd = is_reliable(c) ? c->rcvbuf.maxsize : 0;
719 uint32_t seglen = left > c->utcp->mss ? c->utcp->mss : left;
720 pkt->hdr.seq = c->snd.nxt;
722 buffer_copy(&c->sndbuf, pkt->data, seqdiff(c->snd.nxt, c->snd.una), seglen);
724 c->snd.nxt += seglen;
727 if(!is_reliable(c)) {
735 if(seglen && fin_wanted(c, c->snd.nxt)) {
740 if(!c->rtt_start.tv_sec && is_reliable(c)) {
741 // Start RTT measurement
742 clock_gettime(UTCP_CLOCK, &c->rtt_start);
743 c->rtt_seq = pkt->hdr.seq + seglen;
744 debug(c, "starting RTT measurement, expecting ack %u\n", c->rtt_seq);
747 print_packet(c, "send", pkt, sizeof(pkt->hdr) + seglen);
748 c->utcp->send(c->utcp, pkt, sizeof(pkt->hdr) + seglen);
750 if(left && !is_reliable(c)) {
751 pkt->hdr.wnd += seglen;
756 static ssize_t utcp_send_reliable(struct utcp_connection *c, const void *data, size_t len) {
757 size_t rlen = len + (is_framed(c) ? 2 : 0);
763 // Check if we need to be able to buffer all data
765 if(c->flags & (UTCP_NO_PARTIAL | UTCP_FRAMED)) {
766 if(rlen > c->sndbuf.maxsize) {
771 if((c->flags & UTCP_FRAMED) && len > MAX_UNRELIABLE_SIZE) {
776 if(rlen > buffer_free(&c->sndbuf)) {
782 // Add data to the send buffer.
785 uint16_t len16 = len;
786 buffer_put(&c->sndbuf, &len16, sizeof(len16));
787 assert(buffer_put(&c->sndbuf, data, len) == (ssize_t)len);
789 len = buffer_put(&c->sndbuf, data, len);
799 // Don't send anything yet if the connection has not fully established yet
801 if(c->state == SYN_SENT || c->state == SYN_RECEIVED) {
807 if(!timespec_isset(&c->rtrx_timeout)) {
808 start_retransmit_timer(c);
811 if(!timespec_isset(&c->conn_timeout)) {
812 clock_gettime(UTCP_CLOCK, &c->conn_timeout);
813 c->conn_timeout.tv_sec += c->utcp->timeout;
820 /* In the send buffer we can have multiple frames, each prefixed with their
821 length. However, the first frame might already have been partially sent. The
822 variable c->frame_offset tracks how much of a partial frame is left at the
823 start. If it is 0, it means there is no partial frame, and the first two
824 bytes in the send buffer are the length of the first frame.
826 After sending an MSS sized packet, we need to calculate the new frame_offset
827 value, since it is likely that the next packet will also have a partial frame
828 at the start. We do this by scanning the previously sent packet for frame
829 headers, to find out how many bytes of the last frame are left to send.
831 static void ack_unreliable_framed(struct utcp_connection *c) {
832 int32_t left = seqdiff(c->snd.last, c->snd.nxt);
838 } *pkt = c->utcp->pkt;
840 pkt->hdr.src = c->src;
841 pkt->hdr.dst = c->dst;
842 pkt->hdr.ack = c->rcv.nxt;
843 pkt->hdr.ctl = ACK | MF;
846 bool sent_packet = false;
848 while(left >= c->utcp->mss) {
849 pkt->hdr.wnd = c->frame_offset;
850 uint32_t seglen = c->utcp->mss;
852 pkt->hdr.seq = c->snd.nxt;
854 buffer_copy(&c->sndbuf, pkt->data, seqdiff(c->snd.nxt, c->snd.una), seglen);
856 c->snd.nxt += seglen;
857 c->snd.una = c->snd.nxt;
860 print_packet(c, "send", pkt, sizeof(pkt->hdr) + seglen);
861 c->utcp->send(c->utcp, pkt, sizeof(pkt->hdr) + seglen);
864 // Calculate the new frame offset
865 while(c->frame_offset < seglen) {
867 buffer_copy(&c->sndbuf, &framelen, c->frame_offset, sizeof(framelen));
868 c->frame_offset += framelen + 2;
871 buffer_discard(&c->sndbuf, seglen);
872 c->frame_offset -= seglen;
877 // We sent one packet but we have partial data left, (re)start the flush timer
878 start_flush_timer(c);
880 // There is no partial data in the send buffer, so stop the flush timer
881 stop_retransmit_timer(c);
883 } else if(left && !timespec_isset(&c->rtrx_timeout)) {
884 // We have partial data and we didn't start the flush timer yet
885 start_flush_timer(c);
889 static void flush_unreliable_framed(struct utcp_connection *c) {
890 int32_t left = seqdiff(c->snd.last, c->snd.nxt);
892 /* If the MSS dropped since last time ack_unreliable_frame() was called,
893 we might now have more than one segment worth of data left.
895 if(left > c->utcp->mss) {
896 ack_unreliable_framed(c);
897 left = seqdiff(c->snd.last, c->snd.nxt);
898 assert(left <= c->utcp->mss);
905 } *pkt = c->utcp->pkt;
907 pkt->hdr.src = c->src;
908 pkt->hdr.dst = c->dst;
909 pkt->hdr.seq = c->snd.nxt;
910 pkt->hdr.ack = c->rcv.nxt;
911 pkt->hdr.wnd = c->frame_offset;
912 pkt->hdr.ctl = ACK | MF;
915 uint32_t seglen = left;
917 buffer_copy(&c->sndbuf, pkt->data, seqdiff(c->snd.nxt, c->snd.una), seglen);
918 buffer_discard(&c->sndbuf, seglen);
920 c->snd.nxt += seglen;
921 c->snd.una = c->snd.nxt;
923 print_packet(c, "send", pkt, sizeof(pkt->hdr) + seglen);
924 c->utcp->send(c->utcp, pkt, sizeof(pkt->hdr) + seglen);
928 stop_retransmit_timer(c);
932 static ssize_t utcp_send_unreliable(struct utcp_connection *c, const void *data, size_t len) {
933 if(len > MAX_UNRELIABLE_SIZE) {
938 size_t rlen = len + (is_framed(c) ? 2 : 0);
940 if(rlen > buffer_free(&c->sndbuf)) {
941 if(rlen > c->sndbuf.maxsize) {
950 // Don't send anything yet if the connection has not fully established yet
952 if(c->state == SYN_SENT || c->state == SYN_RECEIVED) {
957 uint16_t framelen = len;
958 buffer_put(&c->sndbuf, &framelen, sizeof(framelen));
961 buffer_put(&c->sndbuf, data, len);
966 ack_unreliable_framed(c);
969 c->snd.una = c->snd.nxt = c->snd.last;
970 buffer_discard(&c->sndbuf, c->sndbuf.used);
976 ssize_t utcp_send(struct utcp_connection *c, const void *data, size_t len) {
978 debug(c, "send() called on closed connection\n");
986 debug(c, "send() called on unconnected connection\n");
1001 debug(c, "send() called on closed connection\n");
1011 if(is_reliable(c)) {
1012 return utcp_send_reliable(c, data, len);
1014 return utcp_send_unreliable(c, data, len);
1018 static void swap_ports(struct hdr *hdr) {
1019 uint16_t tmp = hdr->src;
1020 hdr->src = hdr->dst;
1024 static void fast_retransmit(struct utcp_connection *c) {
1025 if(c->state == CLOSED || c->snd.last == c->snd.una) {
1026 debug(c, "fast_retransmit() called but nothing to retransmit!\n");
1030 struct utcp *utcp = c->utcp;
1035 } *pkt = c->utcp->pkt;
1037 pkt->hdr.src = c->src;
1038 pkt->hdr.dst = c->dst;
1039 pkt->hdr.wnd = c->rcvbuf.maxsize;
1048 // Send unacked data again.
1049 pkt->hdr.seq = c->snd.una;
1050 pkt->hdr.ack = c->rcv.nxt;
1052 uint32_t len = min(seqdiff(c->snd.last, c->snd.una), utcp->mss);
1054 if(fin_wanted(c, c->snd.una + len)) {
1056 pkt->hdr.ctl |= FIN;
1059 buffer_copy(&c->sndbuf, pkt->data, 0, len);
1060 print_packet(c, "rtrx", pkt, sizeof(pkt->hdr) + len);
1061 utcp->send(utcp, pkt, sizeof(pkt->hdr) + len);
1069 static void retransmit(struct utcp_connection *c) {
1070 if(c->state == CLOSED || c->snd.last == c->snd.una) {
1071 debug(c, "retransmit() called but nothing to retransmit!\n");
1072 stop_retransmit_timer(c);
1076 struct utcp *utcp = c->utcp;
1078 if(utcp->retransmit && is_reliable(c)) {
1079 utcp->retransmit(c);
1085 } *pkt = c->utcp->pkt;
1087 pkt->hdr.src = c->src;
1088 pkt->hdr.dst = c->dst;
1089 pkt->hdr.wnd = c->rcvbuf.maxsize;
1094 // Send our SYN again
1095 pkt->hdr.seq = c->snd.iss;
1098 pkt->hdr.aux = 0x0101;
1102 pkt->data[3] = c->flags & 0x7;
1103 print_packet(c, "rtrx", pkt, sizeof(pkt->hdr) + 4);
1104 utcp->send(utcp, pkt, sizeof(pkt->hdr) + 4);
1108 // Send SYNACK again
1109 pkt->hdr.seq = c->snd.nxt;
1110 pkt->hdr.ack = c->rcv.nxt;
1111 pkt->hdr.ctl = SYN | ACK;
1112 print_packet(c, "rtrx", pkt, sizeof(pkt->hdr));
1113 utcp->send(utcp, pkt, sizeof(pkt->hdr));
1121 if(!is_reliable(c) && is_framed(c) && c->sndbuf.used) {
1122 flush_unreliable_framed(c);
1126 // Send unacked data again.
1127 pkt->hdr.seq = c->snd.una;
1128 pkt->hdr.ack = c->rcv.nxt;
1130 uint32_t len = min(seqdiff(c->snd.last, c->snd.una), utcp->mss);
1132 if(fin_wanted(c, c->snd.una + len)) {
1134 pkt->hdr.ctl |= FIN;
1137 // RFC 5681 slow start after timeout
1138 uint32_t flightsize = seqdiff(c->snd.nxt, c->snd.una);
1139 c->snd.ssthresh = max(flightsize / 2, utcp->mss * 2); // eq. 4
1140 c->snd.cwnd = utcp->mss;
1143 buffer_copy(&c->sndbuf, pkt->data, 0, len);
1144 print_packet(c, "rtrx", pkt, sizeof(pkt->hdr) + len);
1145 utcp->send(utcp, pkt, sizeof(pkt->hdr) + len);
1147 c->snd.nxt = c->snd.una + len;
1154 // We shouldn't need to retransmit anything in this state.
1158 stop_retransmit_timer(c);
1162 start_retransmit_timer(c);
1165 if(c->rto > MAX_RTO) {
1169 c->rtt_start.tv_sec = 0; // invalidate RTT timer
1170 c->dupack = 0; // cancel any ongoing fast recovery
1176 /* Update receive buffer and SACK entries after consuming data.
1180 * |.....0000..1111111111.....22222......3333|
1183 * 0..3 represent the SACK entries. The ^ indicates up to which point we want
1184 * to remove data from the receive buffer. The idea is to substract "len"
1185 * from the offset of all the SACK entries, and then remove/cut down entries
1186 * that are shifted to before the start of the receive buffer.
1188 * There are three cases:
1189 * - the SACK entry is after ^, in that case just change the offset.
1190 * - the SACK entry starts before and ends after ^, so we have to
1191 * change both its offset and size.
1192 * - the SACK entry is completely before ^, in that case delete it.
1194 static void sack_consume(struct utcp_connection *c, size_t len) {
1195 debug(c, "sack_consume %lu\n", (unsigned long)len);
1197 if(len > c->rcvbuf.used) {
1198 debug(c, "all SACK entries consumed\n");
1199 c->sacks[0].len = 0;
1203 buffer_discard(&c->rcvbuf, len);
1205 for(int i = 0; i < NSACKS && c->sacks[i].len;) {
1206 if(len < c->sacks[i].offset) {
1207 c->sacks[i].offset -= len;
1209 } else if(len < c->sacks[i].offset + c->sacks[i].len) {
1210 c->sacks[i].len -= len - c->sacks[i].offset;
1211 c->sacks[i].offset = 0;
1214 if(i < NSACKS - 1) {
1215 memmove(&c->sacks[i], &c->sacks[i + 1], (NSACKS - 1 - i) * sizeof(c->sacks)[i]);
1216 c->sacks[NSACKS - 1].len = 0;
1218 c->sacks[i].len = 0;
1224 for(int i = 0; i < NSACKS && c->sacks[i].len; i++) {
1225 debug(c, "SACK[%d] offset %u len %u\n", i, c->sacks[i].offset, c->sacks[i].len);
1229 static void handle_out_of_order(struct utcp_connection *c, uint32_t offset, const void *data, size_t len) {
1230 debug(c, "out of order packet, offset %u\n", offset);
1231 // Packet loss or reordering occured. Store the data in the buffer.
1232 ssize_t rxd = buffer_put_at(&c->rcvbuf, offset, data, len);
1235 debug(c, "packet outside receive buffer, dropping\n");
1239 if((size_t)rxd < len) {
1240 debug(c, "packet partially outside receive buffer\n");
1244 // Make note of where we put it.
1245 for(int i = 0; i < NSACKS; i++) {
1246 if(!c->sacks[i].len) { // nothing to merge, add new entry
1247 debug(c, "new SACK entry %d\n", i);
1248 c->sacks[i].offset = offset;
1249 c->sacks[i].len = rxd;
1251 } else if(offset < c->sacks[i].offset) {
1252 if(offset + rxd < c->sacks[i].offset) { // insert before
1253 if(!c->sacks[NSACKS - 1].len) { // only if room left
1254 debug(c, "insert SACK entry at %d\n", i);
1255 memmove(&c->sacks[i + 1], &c->sacks[i], (NSACKS - i - 1) * sizeof(c->sacks)[i]);
1256 c->sacks[i].offset = offset;
1257 c->sacks[i].len = rxd;
1259 debug(c, "SACK entries full, dropping packet\n");
1264 debug(c, "merge with start of SACK entry at %d\n", i);
1265 c->sacks[i].offset = offset;
1268 } else if(offset <= c->sacks[i].offset + c->sacks[i].len) {
1269 if(offset + rxd > c->sacks[i].offset + c->sacks[i].len) { // merge
1270 debug(c, "merge with end of SACK entry at %d\n", i);
1271 c->sacks[i].len = offset + rxd - c->sacks[i].offset;
1272 // TODO: handle potential merge with next entry
1279 for(int i = 0; i < NSACKS && c->sacks[i].len; i++) {
1280 debug(c, "SACK[%d] offset %u len %u\n", i, c->sacks[i].offset, c->sacks[i].len);
1284 static void handle_out_of_order_framed(struct utcp_connection *c, uint32_t offset, const void *data, size_t len) {
1285 uint32_t in_order_offset = (c->sacks[0].len && c->sacks[0].offset == 0) ? c->sacks[0].len : 0;
1287 // Put the data into the receive buffer
1288 handle_out_of_order(c, offset + in_order_offset, data, len);
1291 static void handle_in_order(struct utcp_connection *c, const void *data, size_t len) {
1293 ssize_t rxd = c->recv(c, data, len);
1295 if(rxd != (ssize_t)len) {
1296 // TODO: handle the application not accepting all data.
1301 // Check if we can process out-of-order data now.
1302 if(c->sacks[0].len && len >= c->sacks[0].offset) {
1303 debug(c, "incoming packet len %lu connected with SACK at %u\n", (unsigned long)len, c->sacks[0].offset);
1305 if(len < c->sacks[0].offset + c->sacks[0].len) {
1306 size_t offset = len;
1307 len = c->sacks[0].offset + c->sacks[0].len;
1308 size_t remainder = len - offset;
1310 ssize_t rxd = buffer_call(c, &c->rcvbuf, offset, remainder);
1312 if(rxd != (ssize_t)remainder) {
1313 // TODO: handle the application not accepting all data.
1319 if(c->rcvbuf.used) {
1320 sack_consume(c, len);
1326 static void handle_in_order_framed(struct utcp_connection *c, const void *data, size_t len) {
1327 // Treat it as out of order, since it is unlikely the start of this packet contains the start of a frame.
1328 uint32_t in_order_offset = (c->sacks[0].len && c->sacks[0].offset == 0) ? c->sacks[0].len : 0;
1329 handle_out_of_order(c, in_order_offset, data, len);
1331 // While we have full frames at the start, give them to the application
1332 while(c->sacks[0].len >= 2 && c->sacks[0].offset == 0) {
1334 buffer_copy(&c->rcvbuf, &framelen, 0, sizeof(&framelen));
1336 if(framelen > c->sacks[0].len - 2) {
1342 uint32_t realoffset = c->rcvbuf.offset + 2;
1344 if(c->rcvbuf.size - c->rcvbuf.offset <= 2) {
1345 realoffset -= c->rcvbuf.size;
1348 if(realoffset > c->rcvbuf.size - framelen) {
1349 // The buffer wraps, we need to copy
1350 uint8_t buf[framelen];
1351 buffer_copy(&c->rcvbuf, buf, 2, framelen);
1352 rxd = c->recv(c, buf, framelen);
1354 // The frame is contiguous in the receive buffer
1355 rxd = c->recv(c, c->rcvbuf.data + realoffset, framelen);
1358 if(rxd != (ssize_t)framelen) {
1359 // TODO: handle the application not accepting all data.
1364 sack_consume(c, framelen + 2);
1370 static void handle_unreliable(struct utcp_connection *c, const struct hdr *hdr, const void *data, size_t len) {
1371 // Fast path for unfragmented packets
1372 if(!hdr->wnd && !(hdr->ctl & MF)) {
1374 c->recv(c, data, len);
1377 c->rcv.nxt = hdr->seq + len;
1381 // Ensure reassembled packet are not larger than 64 kiB
1382 if(hdr->wnd > MAX_UNRELIABLE_SIZE || hdr->wnd + len > MAX_UNRELIABLE_SIZE) {
1386 // Don't accept out of order fragments
1387 if(hdr->wnd && hdr->seq != c->rcv.nxt) {
1391 // Reset the receive buffer for the first fragment
1393 buffer_clear(&c->rcvbuf);
1396 ssize_t rxd = buffer_put_at(&c->rcvbuf, hdr->wnd, data, len);
1398 if(rxd != (ssize_t)len) {
1402 // Send the packet if it's the final fragment
1403 if(!(hdr->ctl & MF)) {
1404 buffer_call(c, &c->rcvbuf, 0, hdr->wnd + len);
1407 c->rcv.nxt = hdr->seq + len;
1410 static void handle_unreliable_framed(struct utcp_connection *c, const struct hdr *hdr, const void *data, size_t len) {
1411 bool in_order = hdr->seq == c->rcv.nxt;
1412 c->rcv.nxt = hdr->seq + len;
1414 const uint8_t *ptr = data;
1417 // Does it start with a partial frame?
1419 // Only accept the data if it is in order
1420 if(in_order && c->rcvbuf.used) {
1421 // In order, append it to the receive buffer
1422 buffer_put(&c->rcvbuf, data, min(hdr->wnd, len));
1424 if(hdr->wnd <= len) {
1425 // We have a full frame
1426 c->recv(c, (uint8_t *)c->rcvbuf.data + 2, c->rcvbuf.used - 2);
1430 // Exit early if there is other data in this frame
1431 if(hdr->wnd > len) {
1433 buffer_clear(&c->rcvbuf);
1443 // We now start with new frames, so clear any data in the receive buffer
1444 buffer_clear(&c->rcvbuf);
1446 // Handle whole frames
1449 memcpy(&framelen, ptr, sizeof(framelen));
1451 if(left <= (size_t)framelen + 2) {
1455 c->recv(c, ptr + 2, framelen);
1456 ptr += framelen + 2;
1457 left -= framelen + 2;
1460 // Handle partial last frame
1462 buffer_put(&c->rcvbuf, ptr, left);
1466 static void handle_incoming_data(struct utcp_connection *c, const struct hdr *hdr, const void *data, size_t len) {
1467 if(!is_reliable(c)) {
1469 handle_unreliable_framed(c, hdr, data, len);
1471 handle_unreliable(c, hdr, data, len);
1477 uint32_t offset = seqdiff(hdr->seq, c->rcv.nxt);
1481 handle_out_of_order_framed(c, offset, data, len);
1483 handle_in_order_framed(c, data, len);
1487 handle_out_of_order(c, offset, data, len);
1489 handle_in_order(c, data, len);
1495 ssize_t utcp_recv(struct utcp *utcp, const void *data, size_t len) {
1496 const uint8_t *ptr = data;
1512 // Drop packets smaller than the header
1516 if(len < sizeof(hdr)) {
1517 print_packet(NULL, "recv", data, len);
1522 // Make a copy from the potentially unaligned data to a struct hdr
1524 memcpy(&hdr, ptr, sizeof(hdr));
1526 // Try to match the packet to an existing connection
1528 struct utcp_connection *c = find_connection(utcp, hdr.dst, hdr.src);
1529 print_packet(c, "recv", data, len);
1531 // Process the header
1536 // Drop packets with an unknown CTL flag
1538 if(hdr.ctl & ~(SYN | ACK | RST | FIN | MF)) {
1539 print_packet(NULL, "recv", data, len);
1544 // Check for auxiliary headers
1546 const uint8_t *init = NULL;
1548 uint16_t aux = hdr.aux;
1551 size_t auxlen = 4 * (aux >> 8) & 0xf;
1552 uint8_t auxtype = aux & 0xff;
1561 if(!(hdr.ctl & SYN) || auxlen != 4) {
1577 if(!(aux & 0x800)) {
1586 memcpy(&aux, ptr, 2);
1591 bool has_data = len || (hdr.ctl & (SYN | FIN));
1593 // Is it for a new connection?
1596 // Ignore RST packets
1602 // Is it a SYN packet and are we LISTENing?
1604 if(hdr.ctl & SYN && !(hdr.ctl & ACK) && utcp->accept) {
1605 // If we don't want to accept it, send a RST back
1606 if((utcp->pre_accept && !utcp->pre_accept(utcp, hdr.dst))) {
1611 // Try to allocate memory, otherwise send a RST back
1612 c = allocate_connection(utcp, hdr.dst, hdr.src);
1619 // Parse auxilliary information
1626 c->flags = init[3] & 0x7;
1628 c->flags = UTCP_TCP;
1632 // Return SYN+ACK, go to SYN_RECEIVED state
1633 c->snd.wnd = hdr.wnd;
1634 c->rcv.irs = hdr.seq;
1635 c->rcv.nxt = c->rcv.irs + 1;
1636 set_state(c, SYN_RECEIVED);
1643 pkt.hdr.src = c->src;
1644 pkt.hdr.dst = c->dst;
1645 pkt.hdr.ack = c->rcv.irs + 1;
1646 pkt.hdr.seq = c->snd.iss;
1647 pkt.hdr.wnd = c->rcvbuf.maxsize;
1648 pkt.hdr.ctl = SYN | ACK;
1651 pkt.hdr.aux = 0x0101;
1655 pkt.data[3] = c->flags & 0x7;
1656 print_packet(c, "send", &pkt, sizeof(hdr) + 4);
1657 utcp->send(utcp, &pkt, sizeof(hdr) + 4);
1660 print_packet(c, "send", &pkt, sizeof(hdr));
1661 utcp->send(utcp, &pkt, sizeof(hdr));
1664 start_retransmit_timer(c);
1666 // No, we don't want your packets, send a RST back
1674 debug(c, "state %s\n", strstate[c->state]);
1676 // In case this is for a CLOSED connection, ignore the packet.
1677 // TODO: make it so incoming packets can never match a CLOSED connection.
1679 if(c->state == CLOSED) {
1680 debug(c, "got packet for closed connection\n");
1684 // It is for an existing connection.
1686 // 1. Drop invalid packets.
1688 // 1a. Drop packets that should not happen in our current state.
1709 // 1b. Discard data that is not in our receive window.
1711 if(is_reliable(c)) {
1714 if(c->state == SYN_SENT) {
1716 } else if(len == 0) {
1717 acceptable = seqdiff(hdr.seq, c->rcv.nxt) >= 0;
1719 int32_t rcv_offset = seqdiff(hdr.seq, c->rcv.nxt);
1721 // cut already accepted front overlapping
1722 if(rcv_offset < 0) {
1723 acceptable = len > (size_t) - rcv_offset;
1728 hdr.seq -= rcv_offset;
1731 acceptable = seqdiff(hdr.seq, c->rcv.nxt) >= 0 && seqdiff(hdr.seq, c->rcv.nxt) + len <= c->rcvbuf.maxsize;
1736 debug(c, "packet not acceptable, %u <= %u + %lu < %u\n", c->rcv.nxt, hdr.seq, (unsigned long)len, c->rcv.nxt + c->rcvbuf.maxsize);
1738 // Ignore unacceptable RST packets.
1743 // Otherwise, continue processing.
1748 int32_t rcv_offset = seqdiff(hdr.seq, c->rcv.nxt);
1751 debug(c, "packet out of order, offset %u bytes\n", rcv_offset);
1757 c->snd.wnd = hdr.wnd; // TODO: move below
1759 // 1c. Drop packets with an invalid ACK.
1760 // ackno should not roll back, and it should also not be bigger than what we ever could have sent
1761 // (= snd.una + c->sndbuf.used).
1763 if(!is_reliable(c)) {
1764 if(hdr.ack != c->snd.last && c->state >= ESTABLISHED) {
1765 hdr.ack = c->snd.una;
1769 if(hdr.ctl & ACK && (seqdiff(hdr.ack, c->snd.last) > 0 || seqdiff(hdr.ack, c->snd.una) < 0)) {
1770 debug(c, "packet ack seqno out of range, %u <= %u < %u\n", c->snd.una, hdr.ack, c->snd.una + c->sndbuf.used);
1772 // Ignore unacceptable RST packets.
1780 // 2. Handle RST packets
1785 if(!(hdr.ctl & ACK)) {
1789 // The peer has refused our connection.
1790 set_state(c, CLOSED);
1791 errno = ECONNREFUSED;
1794 c->recv(c, NULL, 0);
1797 if(c->poll && !c->reapable) {
1808 // We haven't told the application about this connection yet. Silently delete.
1820 // The peer has aborted our connection.
1821 set_state(c, CLOSED);
1825 c->recv(c, NULL, 0);
1828 if(c->poll && !c->reapable) {
1841 // As far as the application is concerned, the connection has already been closed.
1842 // If it has called utcp_close() already, we can immediately free this connection.
1848 // Otherwise, immediately move to the CLOSED state.
1849 set_state(c, CLOSED);
1862 if(!(hdr.ctl & ACK)) {
1867 // 3. Advance snd.una
1869 advanced = seqdiff(hdr.ack, c->snd.una);
1873 if(c->rtt_start.tv_sec) {
1874 if(c->rtt_seq == hdr.ack) {
1875 struct timespec now;
1876 clock_gettime(UTCP_CLOCK, &now);
1877 int32_t diff = timespec_diff_usec(&now, &c->rtt_start);
1878 update_rtt(c, diff);
1879 c->rtt_start.tv_sec = 0;
1880 } else if(c->rtt_seq < hdr.ack) {
1881 debug(c, "cancelling RTT measurement: %u < %u\n", c->rtt_seq, hdr.ack);
1882 c->rtt_start.tv_sec = 0;
1886 int32_t data_acked = advanced;
1894 // TODO: handle FIN as well.
1899 assert(data_acked >= 0);
1902 int32_t bufused = seqdiff(c->snd.last, c->snd.una);
1903 assert(data_acked <= bufused);
1907 buffer_discard(&c->sndbuf, data_acked);
1909 if(is_reliable(c)) {
1914 // Also advance snd.nxt if possible
1915 if(seqdiff(c->snd.nxt, hdr.ack) < 0) {
1916 c->snd.nxt = hdr.ack;
1919 c->snd.una = hdr.ack;
1922 if(c->dupack >= 3) {
1923 debug(c, "fast recovery ended\n");
1924 c->snd.cwnd = c->snd.ssthresh;
1930 // Increase the congestion window according to RFC 5681
1931 if(c->snd.cwnd < c->snd.ssthresh) {
1932 c->snd.cwnd += min(advanced, utcp->mss); // eq. 2
1934 c->snd.cwnd += max(1, (utcp->mss * utcp->mss) / c->snd.cwnd); // eq. 3
1937 if(c->snd.cwnd > c->sndbuf.maxsize) {
1938 c->snd.cwnd = c->sndbuf.maxsize;
1943 // Check if we have sent a FIN that is now ACKed.
1946 if(c->snd.una == c->snd.last) {
1947 set_state(c, FIN_WAIT_2);
1953 if(c->snd.una == c->snd.last) {
1954 clock_gettime(UTCP_CLOCK, &c->conn_timeout);
1955 c->conn_timeout.tv_sec += utcp->timeout;
1956 set_state(c, TIME_WAIT);
1965 if(!len && is_reliable(c) && c->snd.una != c->snd.last) {
1967 debug(c, "duplicate ACK %d\n", c->dupack);
1969 if(c->dupack == 3) {
1970 // RFC 5681 fast recovery
1971 debug(c, "fast recovery started\n", c->dupack);
1972 uint32_t flightsize = seqdiff(c->snd.nxt, c->snd.una);
1973 c->snd.ssthresh = max(flightsize / 2, utcp->mss * 2); // eq. 4
1974 c->snd.cwnd = min(c->snd.ssthresh + 3 * utcp->mss, c->sndbuf.maxsize);
1976 if(c->snd.cwnd > c->sndbuf.maxsize) {
1977 c->snd.cwnd = c->sndbuf.maxsize;
1983 } else if(c->dupack > 3) {
1984 c->snd.cwnd += utcp->mss;
1986 if(c->snd.cwnd > c->sndbuf.maxsize) {
1987 c->snd.cwnd = c->sndbuf.maxsize;
1993 // We got an ACK which indicates the other side did get one of our packets.
1994 // Reset the retransmission timer to avoid going to slow start,
1995 // but don't touch the connection timeout.
1996 start_retransmit_timer(c);
2003 if(c->snd.una == c->snd.last) {
2004 stop_retransmit_timer(c);
2005 timespec_clear(&c->conn_timeout);
2006 } else if(is_reliable(c)) {
2007 start_retransmit_timer(c);
2008 clock_gettime(UTCP_CLOCK, &c->conn_timeout);
2009 c->conn_timeout.tv_sec += utcp->timeout;
2014 // 5. Process SYN stuff
2020 // This is a SYNACK. It should always have ACKed the SYN.
2025 c->rcv.irs = hdr.seq;
2026 c->rcv.nxt = hdr.seq + 1;
2030 set_state(c, FIN_WAIT_1);
2033 set_state(c, ESTABLISHED);
2039 // This is a retransmit of a SYN, send back the SYNACK.
2049 // This could be a retransmission. Ignore the SYN flag, but send an ACK back.
2060 // 6. Process new data
2062 if(c->state == SYN_RECEIVED) {
2063 // This is the ACK after the SYNACK. It should always have ACKed the SYNACK.
2068 // Are we still LISTENing?
2070 utcp->accept(c, c->src);
2073 if(c->state != ESTABLISHED) {
2074 set_state(c, CLOSED);
2084 // This should never happen.
2099 // Ehm no, We should never receive more data after a FIN.
2109 handle_incoming_data(c, &hdr, ptr, len);
2112 // 7. Process FIN stuff
2114 if((hdr.ctl & FIN) && (!is_reliable(c) || hdr.seq + len == c->rcv.nxt)) {
2118 // This should never happen.
2125 set_state(c, CLOSE_WAIT);
2129 set_state(c, CLOSING);
2133 clock_gettime(UTCP_CLOCK, &c->conn_timeout);
2134 c->conn_timeout.tv_sec += utcp->timeout;
2135 set_state(c, TIME_WAIT);
2142 // Ehm, no. We should never receive a second FIN.
2152 // FIN counts as one sequence number
2156 // Inform the application that the peer closed its end of the connection.
2159 c->recv(c, NULL, 0);
2163 // Now we send something back if:
2164 // - we received data, so we have to send back an ACK
2165 // -> sendatleastone = true
2166 // - or we got an ack, so we should maybe send a bit more data
2167 // -> sendatleastone = false
2169 if(is_reliable(c) || hdr.ctl & SYN || hdr.ctl & FIN) {
2184 hdr.ack = hdr.seq + len;
2186 hdr.ctl = RST | ACK;
2189 print_packet(c, "send", &hdr, sizeof(hdr));
2190 utcp->send(utcp, &hdr, sizeof(hdr));
2195 int utcp_shutdown(struct utcp_connection *c, int dir) {
2196 debug(c, "shutdown %d at %u\n", dir, c ? c->snd.last : 0);
2204 debug(c, "shutdown() called on closed connection\n");
2209 if(!(dir == UTCP_SHUT_RD || dir == UTCP_SHUT_WR || dir == UTCP_SHUT_RDWR)) {
2214 // TCP does not have a provision for stopping incoming packets.
2215 // The best we can do is to just ignore them.
2216 if(dir == UTCP_SHUT_RD || dir == UTCP_SHUT_RDWR) {
2220 // The rest of the code deals with shutting down writes.
2221 if(dir == UTCP_SHUT_RD) {
2225 // Only process shutting down writes once.
2243 if(!is_reliable(c) && is_framed(c)) {
2244 flush_unreliable_framed(c);
2247 set_state(c, FIN_WAIT_1);
2255 set_state(c, CLOSING);
2266 ack(c, !is_reliable(c));
2268 if(!timespec_isset(&c->rtrx_timeout)) {
2269 start_retransmit_timer(c);
2275 static bool reset_connection(struct utcp_connection *c) {
2282 debug(c, "abort() called on closed connection\n");
2299 set_state(c, CLOSED);
2307 set_state(c, CLOSED);
2317 hdr.seq = c->snd.nxt;
2322 print_packet(c, "send", &hdr, sizeof(hdr));
2323 c->utcp->send(c->utcp, &hdr, sizeof(hdr));
2327 // Closes all the opened connections
2328 void utcp_abort_all_connections(struct utcp *utcp) {
2334 for(int i = 0; i < utcp->nconnections; i++) {
2335 struct utcp_connection *c = utcp->connections[i];
2337 if(c->reapable || c->state == CLOSED) {
2341 utcp_recv_t old_recv = c->recv;
2342 utcp_poll_t old_poll = c->poll;
2344 reset_connection(c);
2348 old_recv(c, NULL, 0);
2351 if(old_poll && !c->reapable) {
2360 int utcp_close(struct utcp_connection *c) {
2361 debug(c, "closing\n");
2363 if(c->rcvbuf.used) {
2364 debug(c, "receive buffer not empty, resetting\n");
2365 return reset_connection(c) ? 0 : -1;
2368 if(utcp_shutdown(c, SHUT_RDWR) && errno != ENOTCONN) {
2378 int utcp_abort(struct utcp_connection *c) {
2379 if(!reset_connection(c)) {
2388 * One call to this function will loop through all connections,
2389 * checking if something needs to be resent or not.
2390 * The return value is the time to the next timeout in milliseconds,
2391 * or maybe a negative value if the timeout is infinite.
2393 struct timespec utcp_timeout(struct utcp *utcp) {
2394 struct timespec now;
2395 clock_gettime(UTCP_CLOCK, &now);
2396 struct timespec next = {now.tv_sec + 3600, now.tv_nsec};
2398 for(int i = 0; i < utcp->nconnections; i++) {
2399 struct utcp_connection *c = utcp->connections[i];
2405 // delete connections that have been utcp_close()d.
2406 if(c->state == CLOSED) {
2408 debug(c, "reaping\n");
2416 if(timespec_isset(&c->conn_timeout) && timespec_lt(&c->conn_timeout, &now)) {
2421 c->recv(c, NULL, 0);
2424 if(c->poll && !c->reapable) {
2431 if(timespec_isset(&c->rtrx_timeout) && timespec_lt(&c->rtrx_timeout, &now)) {
2432 debug(c, "retransmitting after timeout\n");
2437 if((c->state == ESTABLISHED || c->state == CLOSE_WAIT) && c->do_poll) {
2439 uint32_t len = is_framed(c) ? min(buffer_free(&c->sndbuf), MAX_UNRELIABLE_SIZE) : buffer_free(&c->sndbuf);
2444 } else if(c->state == CLOSED) {
2449 if(timespec_isset(&c->conn_timeout) && timespec_lt(&c->conn_timeout, &next)) {
2450 next = c->conn_timeout;
2453 if(timespec_isset(&c->rtrx_timeout) && timespec_lt(&c->rtrx_timeout, &next)) {
2454 next = c->rtrx_timeout;
2458 struct timespec diff;
2460 timespec_sub(&next, &now, &diff);
2465 bool utcp_is_active(struct utcp *utcp) {
2470 for(int i = 0; i < utcp->nconnections; i++)
2471 if(utcp->connections[i]->state != CLOSED && utcp->connections[i]->state != TIME_WAIT) {
2478 struct utcp *utcp_init(utcp_accept_t accept, utcp_pre_accept_t pre_accept, utcp_send_t send, void *priv) {
2484 struct utcp *utcp = calloc(1, sizeof(*utcp));
2490 utcp_set_mtu(utcp, DEFAULT_MTU);
2497 if(!CLOCK_GRANULARITY) {
2498 struct timespec res;
2499 clock_getres(UTCP_CLOCK, &res);
2500 CLOCK_GRANULARITY = res.tv_sec * USEC_PER_SEC + res.tv_nsec / 1000;
2503 utcp->accept = accept;
2504 utcp->pre_accept = pre_accept;
2507 utcp->timeout = DEFAULT_USER_TIMEOUT; // sec
2512 void utcp_exit(struct utcp *utcp) {
2517 for(int i = 0; i < utcp->nconnections; i++) {
2518 struct utcp_connection *c = utcp->connections[i];
2522 c->recv(c, NULL, 0);
2525 if(c->poll && !c->reapable) {
2530 buffer_exit(&c->rcvbuf);
2531 buffer_exit(&c->sndbuf);
2535 free(utcp->connections);
2540 uint16_t utcp_get_mtu(struct utcp *utcp) {
2541 return utcp ? utcp->mtu : 0;
2544 uint16_t utcp_get_mss(struct utcp *utcp) {
2545 return utcp ? utcp->mss : 0;
2548 void utcp_set_mtu(struct utcp *utcp, uint16_t mtu) {
2553 if(mtu <= sizeof(struct hdr)) {
2557 if(mtu > utcp->mtu) {
2558 char *new = realloc(utcp->pkt, mtu + sizeof(struct hdr));
2568 utcp->mss = mtu - sizeof(struct hdr);
2571 void utcp_reset_timers(struct utcp *utcp) {
2576 struct timespec now, then;
2578 clock_gettime(UTCP_CLOCK, &now);
2582 then.tv_sec += utcp->timeout;
2584 for(int i = 0; i < utcp->nconnections; i++) {
2585 struct utcp_connection *c = utcp->connections[i];
2591 if(timespec_isset(&c->rtrx_timeout)) {
2592 c->rtrx_timeout = now;
2595 if(timespec_isset(&c->conn_timeout)) {
2596 c->conn_timeout = then;
2599 c->rtt_start.tv_sec = 0;
2601 if(c->rto > START_RTO) {
2607 int utcp_get_user_timeout(struct utcp *u) {
2608 return u ? u->timeout : 0;
2611 void utcp_set_user_timeout(struct utcp *u, int timeout) {
2613 u->timeout = timeout;
2617 size_t utcp_get_sndbuf(struct utcp_connection *c) {
2618 return c ? c->sndbuf.maxsize : 0;
2621 size_t utcp_get_sndbuf_free(struct utcp_connection *c) {
2631 return buffer_free(&c->sndbuf);
2638 void utcp_set_sndbuf(struct utcp_connection *c, size_t size) {
2643 c->sndbuf.maxsize = size;
2645 if(c->sndbuf.maxsize != size) {
2646 c->sndbuf.maxsize = -1;
2649 c->do_poll = is_reliable(c) && buffer_free(&c->sndbuf);
2652 size_t utcp_get_rcvbuf(struct utcp_connection *c) {
2653 return c ? c->rcvbuf.maxsize : 0;
2656 size_t utcp_get_rcvbuf_free(struct utcp_connection *c) {
2657 if(c && (c->state == ESTABLISHED || c->state == CLOSE_WAIT)) {
2658 return buffer_free(&c->rcvbuf);
2664 void utcp_set_rcvbuf(struct utcp_connection *c, size_t size) {
2669 c->rcvbuf.maxsize = size;
2671 if(c->rcvbuf.maxsize != size) {
2672 c->rcvbuf.maxsize = -1;
2676 size_t utcp_get_sendq(struct utcp_connection *c) {
2677 return c->sndbuf.used;
2680 size_t utcp_get_recvq(struct utcp_connection *c) {
2681 return c->rcvbuf.used;
2684 bool utcp_get_nodelay(struct utcp_connection *c) {
2685 return c ? c->nodelay : false;
2688 void utcp_set_nodelay(struct utcp_connection *c, bool nodelay) {
2690 c->nodelay = nodelay;
2694 bool utcp_get_keepalive(struct utcp_connection *c) {
2695 return c ? c->keepalive : false;
2698 void utcp_set_keepalive(struct utcp_connection *c, bool keepalive) {
2700 c->keepalive = keepalive;
2704 size_t utcp_get_outq(struct utcp_connection *c) {
2705 return c ? seqdiff(c->snd.nxt, c->snd.una) : 0;
2708 void utcp_set_recv_cb(struct utcp_connection *c, utcp_recv_t recv) {
2714 void utcp_set_poll_cb(struct utcp_connection *c, utcp_poll_t poll) {
2717 c->do_poll = is_reliable(c) && buffer_free(&c->sndbuf);
2721 void utcp_set_accept_cb(struct utcp *utcp, utcp_accept_t accept, utcp_pre_accept_t pre_accept) {
2723 utcp->accept = accept;
2724 utcp->pre_accept = pre_accept;
2728 void utcp_expect_data(struct utcp_connection *c, bool expect) {
2729 if(!c || c->reapable) {
2733 if(!(c->state == ESTABLISHED || c->state == FIN_WAIT_1 || c->state == FIN_WAIT_2)) {
2738 // If we expect data, start the connection timer.
2739 if(!timespec_isset(&c->conn_timeout)) {
2740 clock_gettime(UTCP_CLOCK, &c->conn_timeout);
2741 c->conn_timeout.tv_sec += c->utcp->timeout;
2744 // If we want to cancel expecting data, only clear the timer when there is no unACKed data.
2745 if(c->snd.una == c->snd.last) {
2746 timespec_clear(&c->conn_timeout);
2751 void utcp_offline(struct utcp *utcp, bool offline) {
2752 struct timespec now;
2753 clock_gettime(UTCP_CLOCK, &now);
2755 for(int i = 0; i < utcp->nconnections; i++) {
2756 struct utcp_connection *c = utcp->connections[i];
2762 utcp_expect_data(c, offline);
2765 if(timespec_isset(&c->rtrx_timeout)) {
2766 c->rtrx_timeout = now;
2769 utcp->connections[i]->rtt_start.tv_sec = 0;
2771 if(c->rto > START_RTO) {
2778 void utcp_set_retransmit_cb(struct utcp *utcp, utcp_retransmit_t cb) {
2779 utcp->retransmit = cb;
2782 void utcp_set_clock_granularity(long granularity) {
2783 CLOCK_GRANULARITY = granularity;
2786 int utcp_get_flush_timeout(struct utcp *utcp) {
2787 return utcp->flush_timeout;
2790 void utcp_set_flush_timeout(struct utcp *utcp, int milliseconds) {
2791 utcp->flush_timeout = milliseconds;