]> git.meshlink.io Git - utcp/blob - utcp.c
Add utcp_set_clock_granularity().
[utcp] / utcp.c
1 /*
2     utcp.c -- Userspace TCP
3     Copyright (C) 2014-2017 Guus Sliepen <guus@tinc-vpn.org>
4
5     This program is free software; you can redistribute it and/or modify
6     it under the terms of the GNU General Public License as published by
7     the Free Software Foundation; either version 2 of the License, or
8     (at your option) any later version.
9
10     This program is distributed in the hope that it will be useful,
11     but WITHOUT ANY WARRANTY; without even the implied warranty of
12     MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
13     GNU General Public License for more details.
14
15     You should have received a copy of the GNU General Public License along
16     with this program; if not, write to the Free Software Foundation, Inc.,
17     51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
18 */
19
20 #define _GNU_SOURCE
21
22 #include <assert.h>
23 #include <errno.h>
24 #include <stdio.h>
25 #include <stdlib.h>
26 #include <stdint.h>
27 #include <stdbool.h>
28 #include <string.h>
29 #include <unistd.h>
30 #include <time.h>
31
32 #include "utcp_priv.h"
33
34 #ifndef EBADMSG
35 #define EBADMSG         104
36 #endif
37
38 #ifndef SHUT_RDWR
39 #define SHUT_RDWR 2
40 #endif
41
42 #ifdef poll
43 #undef poll
44 #endif
45
46 static void timespec_sub(const struct timespec *a, const struct timespec *b, struct timespec *r) {
47         r->tv_sec = a->tv_sec - b->tv_sec;
48         r->tv_nsec = a->tv_nsec - b->tv_nsec;
49
50         if(r->tv_nsec < 0) {
51                 r->tv_sec--, r->tv_nsec += NSEC_PER_SEC;
52         }
53 }
54
55 static int32_t timespec_diff_usec(const struct timespec *a, const struct timespec *b) {
56         int64_t diff = (a->tv_sec - b->tv_sec) * 1000000000 + a->tv_sec - b->tv_sec;
57         return diff / 1000;
58 }
59
60 static bool timespec_lt(const struct timespec *a, const struct timespec *b) {
61         if(a->tv_sec == b->tv_sec) {
62                 return a->tv_nsec < b->tv_nsec;
63         } else {
64                 return a->tv_sec < b->tv_sec;
65         }
66 }
67
68 static void timespec_clear(struct timespec *a) {
69         a->tv_sec = 0;
70 }
71
72 static bool timespec_isset(const struct timespec *a) {
73         return a->tv_sec;
74 }
75
76 static long CLOCK_GRANULARITY; // usec
77
78 static inline size_t min(size_t a, size_t b) {
79         return a < b ? a : b;
80 }
81
82 static inline size_t max(size_t a, size_t b) {
83         return a > b ? a : b;
84 }
85
86 #ifdef UTCP_DEBUG
87 #include <stdarg.h>
88
89 #ifndef UTCP_DEBUG_DATALEN
90 #define UTCP_DEBUG_DATALEN 20
91 #endif
92
93 #ifndef UTCP_CLOCK
94 #if defined(CLOCK_MONOTONIC_RAW) && defined(__x86_64__)
95 #define UTCP_CLOCK CLOCK_MONOTONIC_RAW
96 #else
97 #define UTCP_CLOCK CLOCK_MONOTONIC
98 #endif
99 #endif
100
101 static void debug(struct utcp_connection *c, const char *format, ...) {
102         struct timespec tv;
103         char buf[1024];
104         int len;
105
106         clock_gettime(CLOCK_REALTIME, &tv);
107         len = snprintf(buf, sizeof(buf), "%ld.%06lu %u:%u ", (long)tv.tv_sec, tv.tv_nsec / 1000, c ? c->src : 0, c ? c->dst : 0);
108         va_list ap;
109         va_start(ap, format);
110         len += vsnprintf(buf + len, sizeof(buf) - len, format, ap);
111         va_end(ap);
112
113         if(len > 0 && (size_t)len < sizeof(buf)) {
114                 fwrite(buf, len, 1, stderr);
115         }
116 }
117
118 static void print_packet(struct utcp_connection *c, const char *dir, const void *pkt, size_t len) {
119         struct hdr hdr;
120
121         if(len < sizeof(hdr)) {
122                 debug(c, "%s: short packet (%lu bytes)\n", dir, (unsigned long)len);
123                 return;
124         }
125
126         memcpy(&hdr, pkt, sizeof(hdr));
127
128         uint32_t datalen;
129
130         if(len > sizeof(hdr)) {
131                 datalen = min(len - sizeof(hdr), UTCP_DEBUG_DATALEN);
132         } else {
133                 datalen = 0;
134         }
135
136
137         const uint8_t *data = (uint8_t *)pkt + sizeof(hdr);
138         char str[datalen * 2 + 1];
139         char *p = str;
140
141         for(uint32_t i = 0; i < datalen; i++) {
142                 *p++ = "0123456789ABCDEF"[data[i] >> 4];
143                 *p++ = "0123456789ABCDEF"[data[i] & 15];
144         }
145
146         *p = 0;
147
148         debug(c, "%s: len %lu src %u dst %u seq %u ack %u wnd %u aux %x ctl %s%s%s%s data %s\n",
149               dir, (unsigned long)len, hdr.src, hdr.dst, hdr.seq, hdr.ack, hdr.wnd, hdr.aux,
150               hdr.ctl & SYN ? "SYN" : "",
151               hdr.ctl & RST ? "RST" : "",
152               hdr.ctl & FIN ? "FIN" : "",
153               hdr.ctl & ACK ? "ACK" : "",
154               str
155              );
156 }
157
158 static void debug_cwnd(struct utcp_connection *c) {
159         debug(c, "snd.cwnd %u snd.ssthresh %u\n", c->snd.cwnd, ~c->snd.ssthresh ? c->snd.ssthresh : 0);
160 }
161 #else
162 #define debug(...) do {} while(0)
163 #define print_packet(...) do {} while(0)
164 #define debug_cwnd(...) do {} while(0)
165 #endif
166
167 static void set_state(struct utcp_connection *c, enum state state) {
168         c->state = state;
169
170         if(state == ESTABLISHED) {
171                 timespec_clear(&c->conn_timeout);
172         }
173
174         debug(c, "state %s\n", strstate[state]);
175 }
176
177 static bool fin_wanted(struct utcp_connection *c, uint32_t seq) {
178         if(seq != c->snd.last) {
179                 return false;
180         }
181
182         switch(c->state) {
183         case FIN_WAIT_1:
184         case CLOSING:
185         case LAST_ACK:
186                 return true;
187
188         default:
189                 return false;
190         }
191 }
192
193 static bool is_reliable(struct utcp_connection *c) {
194         return c->flags & UTCP_RELIABLE;
195 }
196
197 static int32_t seqdiff(uint32_t a, uint32_t b) {
198         return a - b;
199 }
200
201 // Buffer functions
202 static bool buffer_wraps(struct buffer *buf) {
203         return buf->size - buf->offset < buf->used;
204 }
205
206 static bool buffer_resize(struct buffer *buf, uint32_t newsize) {
207         char *newdata = realloc(buf->data, newsize);
208
209         if(!newdata) {
210                 return false;
211         }
212
213         buf->data = newdata;
214
215         if(buffer_wraps(buf)) {
216                 // Shift the right part of the buffer until it hits the end of the new buffer.
217                 // Old situation:
218                 // [345......012]
219                 // New situation:
220                 // [345.........|........012]
221                 uint32_t tailsize = buf->size - buf->offset;
222                 uint32_t newoffset = newsize - tailsize;
223                 memmove(buf + newoffset, buf + buf->offset, tailsize);
224                 buf->offset = newoffset;
225         }
226
227         buf->size = newsize;
228         return true;
229 }
230
231 // Store data into the buffer
232 static ssize_t buffer_put_at(struct buffer *buf, size_t offset, const void *data, size_t len) {
233         debug(NULL, "buffer_put_at %lu %lu %lu\n", (unsigned long)buf->used, (unsigned long)offset, (unsigned long)len);
234
235         // Ensure we don't store more than maxsize bytes in total
236         size_t required = offset + len;
237
238         if(required > buf->maxsize) {
239                 if(offset >= buf->maxsize) {
240                         return 0;
241                 }
242
243                 len = buf->maxsize - offset;
244                 required = buf->maxsize;
245         }
246
247         // Check if we need to resize the buffer
248         if(required > buf->size) {
249                 size_t newsize = buf->size;
250
251                 if(!newsize) {
252                         newsize = 4096;
253                 }
254
255                 do {
256                         newsize *= 2;
257                 } while(newsize < required);
258
259                 if(newsize > buf->maxsize) {
260                         newsize = buf->maxsize;
261                 }
262
263                 if(!buffer_resize(buf, newsize)) {
264                         return -1;
265                 }
266         }
267
268         uint32_t realoffset = buf->offset + offset;
269
270         if(buf->size - buf->offset < offset) {
271                 // The offset wrapped
272                 realoffset -= buf->size;
273         }
274
275         if(buf->size - realoffset < len) {
276                 // The new chunk of data must be wrapped
277                 memcpy(buf->data + realoffset, data, buf->size - realoffset);
278                 memcpy(buf->data, (char *)data + buf->size - realoffset, len - (buf->size - realoffset));
279         } else {
280                 memcpy(buf->data + realoffset, data, len);
281         }
282
283         if(required > buf->used) {
284                 buf->used = required;
285         }
286
287         return len;
288 }
289
290 static ssize_t buffer_put(struct buffer *buf, const void *data, size_t len) {
291         return buffer_put_at(buf, buf->used, data, len);
292 }
293
294 // Copy data from the buffer without removing it.
295 static ssize_t buffer_copy(struct buffer *buf, void *data, size_t offset, size_t len) {
296         // Ensure we don't copy more than is actually stored in the buffer
297         if(offset >= buf->used) {
298                 return 0;
299         }
300
301         if(buf->used - offset < len) {
302                 len = buf->used - offset;
303         }
304
305         uint32_t realoffset = buf->offset + offset;
306
307         if(buf->size - buf->offset < offset) {
308                 // The offset wrapped
309                 realoffset -= buf->size;
310         }
311
312         if(buf->size - realoffset < len) {
313                 // The data is wrapped
314                 memcpy(data, buf->data + realoffset, buf->size - realoffset);
315                 memcpy((char *)data + buf->size - realoffset, buf->data, len - (buf->size - realoffset));
316         } else {
317                 memcpy(data, buf->data + realoffset, len);
318         }
319
320         return len;
321 }
322
323 // Discard data from the buffer.
324 static ssize_t buffer_discard(struct buffer *buf, size_t len) {
325         if(buf->used < len) {
326                 len = buf->used;
327         }
328
329         if(buf->size - buf->offset < len) {
330                 buf->offset -= buf->size;
331         }
332
333         buf->offset += len;
334         buf->used -= len;
335
336         return len;
337 }
338
339 static bool buffer_set_size(struct buffer *buf, uint32_t minsize, uint32_t maxsize) {
340         if(maxsize < minsize) {
341                 maxsize = minsize;
342         }
343
344         buf->maxsize = maxsize;
345
346         return buf->size >= minsize || buffer_resize(buf, minsize);
347 }
348
349 static void buffer_exit(struct buffer *buf) {
350         free(buf->data);
351         memset(buf, 0, sizeof(*buf));
352 }
353
354 static uint32_t buffer_free(const struct buffer *buf) {
355         return buf->maxsize - buf->used;
356 }
357
358 // Connections are stored in a sorted list.
359 // This gives O(log(N)) lookup time, O(N log(N)) insertion time and O(N) deletion time.
360
361 static int compare(const void *va, const void *vb) {
362         assert(va && vb);
363
364         const struct utcp_connection *a = *(struct utcp_connection **)va;
365         const struct utcp_connection *b = *(struct utcp_connection **)vb;
366
367         assert(a && b);
368         assert(a->src && b->src);
369
370         int c = (int)a->src - (int)b->src;
371
372         if(c) {
373                 return c;
374         }
375
376         c = (int)a->dst - (int)b->dst;
377         return c;
378 }
379
380 static struct utcp_connection *find_connection(const struct utcp *utcp, uint16_t src, uint16_t dst) {
381         if(!utcp->nconnections) {
382                 return NULL;
383         }
384
385         struct utcp_connection key = {
386                 .src = src,
387                 .dst = dst,
388         }, *keyp = &key;
389         struct utcp_connection **match = bsearch(&keyp, utcp->connections, utcp->nconnections, sizeof(*utcp->connections), compare);
390         return match ? *match : NULL;
391 }
392
393 static void free_connection(struct utcp_connection *c) {
394         struct utcp *utcp = c->utcp;
395         struct utcp_connection **cp = bsearch(&c, utcp->connections, utcp->nconnections, sizeof(*utcp->connections), compare);
396
397         assert(cp);
398
399         int i = cp - utcp->connections;
400         memmove(cp, cp + 1, (utcp->nconnections - i - 1) * sizeof(*cp));
401         utcp->nconnections--;
402
403         buffer_exit(&c->rcvbuf);
404         buffer_exit(&c->sndbuf);
405         free(c);
406 }
407
408 static struct utcp_connection *allocate_connection(struct utcp *utcp, uint16_t src, uint16_t dst) {
409         // Check whether this combination of src and dst is free
410
411         if(src) {
412                 if(find_connection(utcp, src, dst)) {
413                         errno = EADDRINUSE;
414                         return NULL;
415                 }
416         } else { // If src == 0, generate a random port number with the high bit set
417                 if(utcp->nconnections >= 32767) {
418                         errno = ENOMEM;
419                         return NULL;
420                 }
421
422                 src = rand() | 0x8000;
423
424                 while(find_connection(utcp, src, dst)) {
425                         src++;
426                 }
427         }
428
429         // Allocate memory for the new connection
430
431         if(utcp->nconnections >= utcp->nallocated) {
432                 if(!utcp->nallocated) {
433                         utcp->nallocated = 4;
434                 } else {
435                         utcp->nallocated *= 2;
436                 }
437
438                 struct utcp_connection **new_array = realloc(utcp->connections, utcp->nallocated * sizeof(*utcp->connections));
439
440                 if(!new_array) {
441                         return NULL;
442                 }
443
444                 utcp->connections = new_array;
445         }
446
447         struct utcp_connection *c = calloc(1, sizeof(*c));
448
449         if(!c) {
450                 return NULL;
451         }
452
453         if(!buffer_set_size(&c->sndbuf, DEFAULT_SNDBUFSIZE, DEFAULT_MAXSNDBUFSIZE)) {
454                 free(c);
455                 return NULL;
456         }
457
458         if(!buffer_set_size(&c->rcvbuf, DEFAULT_RCVBUFSIZE, DEFAULT_MAXRCVBUFSIZE)) {
459                 buffer_exit(&c->sndbuf);
460                 free(c);
461                 return NULL;
462         }
463
464         // Fill in the details
465
466         c->src = src;
467         c->dst = dst;
468 #ifdef UTCP_DEBUG
469         c->snd.iss = 0;
470 #else
471         c->snd.iss = rand();
472 #endif
473         c->snd.una = c->snd.iss;
474         c->snd.nxt = c->snd.iss + 1;
475         c->snd.last = c->snd.nxt;
476         c->snd.cwnd = (utcp->mss > 2190 ? 2 : utcp->mss > 1095 ? 3 : 4) * utcp->mss;
477         c->snd.ssthresh = ~0;
478         debug_cwnd(c);
479         c->utcp = utcp;
480
481         // Add it to the sorted list of connections
482
483         utcp->connections[utcp->nconnections++] = c;
484         qsort(utcp->connections, utcp->nconnections, sizeof(*utcp->connections), compare);
485
486         return c;
487 }
488
489 static inline uint32_t absdiff(uint32_t a, uint32_t b) {
490         if(a > b) {
491                 return a - b;
492         } else {
493                 return b - a;
494         }
495 }
496
497 // Update RTT variables. See RFC 6298.
498 static void update_rtt(struct utcp_connection *c, uint32_t rtt) {
499         if(!rtt) {
500                 debug(c, "invalid rtt\n");
501                 return;
502         }
503
504         struct utcp *utcp = c->utcp;
505
506         if(!utcp->srtt) {
507                 utcp->srtt = rtt;
508                 utcp->rttvar = rtt / 2;
509         } else {
510                 utcp->rttvar = (utcp->rttvar * 3 + absdiff(utcp->srtt, rtt)) / 4;
511                 utcp->srtt = (utcp->srtt * 7 + rtt) / 8;
512         }
513
514         utcp->rto = utcp->srtt + max(4 * utcp->rttvar, CLOCK_GRANULARITY);
515
516         if(utcp->rto > MAX_RTO) {
517                 utcp->rto = MAX_RTO;
518         }
519
520         debug(c, "rtt %u srtt %u rttvar %u rto %u\n", rtt, utcp->srtt, utcp->rttvar, utcp->rto);
521 }
522
523 static void start_retransmit_timer(struct utcp_connection *c) {
524         clock_gettime(UTCP_CLOCK, &c->rtrx_timeout);
525
526         uint32_t rto = c->utcp->rto;
527
528         while(rto > USEC_PER_SEC) {
529                 c->rtrx_timeout.tv_sec++;
530                 rto -= USEC_PER_SEC;
531         }
532
533         c->rtrx_timeout.tv_nsec += c->utcp->rto * 1000;
534
535         if(c->rtrx_timeout.tv_nsec >= NSEC_PER_SEC) {
536                 c->rtrx_timeout.tv_nsec -= NSEC_PER_SEC;
537                 c->rtrx_timeout.tv_sec++;
538         }
539
540         debug(c, "rtrx_timeout %ld.%06lu\n", c->rtrx_timeout.tv_sec, c->rtrx_timeout.tv_nsec);
541 }
542
543 static void stop_retransmit_timer(struct utcp_connection *c) {
544         timespec_clear(&c->rtrx_timeout);
545         debug(c, "rtrx_timeout cleared\n");
546 }
547
548 struct utcp_connection *utcp_connect_ex(struct utcp *utcp, uint16_t dst, utcp_recv_t recv, void *priv, uint32_t flags) {
549         struct utcp_connection *c = allocate_connection(utcp, 0, dst);
550
551         if(!c) {
552                 return NULL;
553         }
554
555         assert((flags & ~0x1f) == 0);
556
557         c->flags = flags;
558         c->recv = recv;
559         c->priv = priv;
560
561         struct {
562                 struct hdr hdr;
563                 uint8_t init[4];
564         } pkt;
565
566         pkt.hdr.src = c->src;
567         pkt.hdr.dst = c->dst;
568         pkt.hdr.seq = c->snd.iss;
569         pkt.hdr.ack = 0;
570         pkt.hdr.wnd = c->rcvbuf.maxsize;
571         pkt.hdr.ctl = SYN;
572         pkt.hdr.aux = 0x0101;
573         pkt.init[0] = 1;
574         pkt.init[1] = 0;
575         pkt.init[2] = 0;
576         pkt.init[3] = flags & 0x7;
577
578         set_state(c, SYN_SENT);
579
580         print_packet(c, "send", &pkt, sizeof(pkt));
581         utcp->send(utcp, &pkt, sizeof(pkt));
582
583         clock_gettime(UTCP_CLOCK, &c->conn_timeout);
584         c->conn_timeout.tv_sec += utcp->timeout;
585
586         start_retransmit_timer(c);
587
588         return c;
589 }
590
591 struct utcp_connection *utcp_connect(struct utcp *utcp, uint16_t dst, utcp_recv_t recv, void *priv) {
592         return utcp_connect_ex(utcp, dst, recv, priv, UTCP_TCP);
593 }
594
595 void utcp_accept(struct utcp_connection *c, utcp_recv_t recv, void *priv) {
596         if(c->reapable || c->state != SYN_RECEIVED) {
597                 debug(c, "accept() called on invalid connection in state %s\n", c, strstate[c->state]);
598                 return;
599         }
600
601         debug(c, "accepted %p %p\n", c, recv, priv);
602         c->recv = recv;
603         c->priv = priv;
604         set_state(c, ESTABLISHED);
605 }
606
607 static void ack(struct utcp_connection *c, bool sendatleastone) {
608         int32_t left = seqdiff(c->snd.last, c->snd.nxt);
609         int32_t cwndleft = min(c->snd.cwnd, c->snd.wnd) - seqdiff(c->snd.nxt, c->snd.una);
610
611         assert(left >= 0);
612
613         if(cwndleft <= 0) {
614                 left = 0;
615         } else if(cwndleft < left) {
616                 left = cwndleft;
617
618                 if(!sendatleastone || cwndleft > c->utcp->mss) {
619                         left -= left % c->utcp->mss;
620                 }
621         }
622
623         debug(c, "cwndleft %d left %d\n", cwndleft, left);
624
625         if(!left && !sendatleastone) {
626                 return;
627         }
628
629         struct {
630                 struct hdr hdr;
631                 uint8_t data[];
632         } *pkt = c->utcp->pkt;
633
634         pkt->hdr.src = c->src;
635         pkt->hdr.dst = c->dst;
636         pkt->hdr.ack = c->rcv.nxt;
637         pkt->hdr.wnd = c->rcvbuf.maxsize;
638         pkt->hdr.ctl = ACK;
639         pkt->hdr.aux = 0;
640
641         do {
642                 uint32_t seglen = left > c->utcp->mss ? c->utcp->mss : left;
643                 pkt->hdr.seq = c->snd.nxt;
644
645                 buffer_copy(&c->sndbuf, pkt->data, seqdiff(c->snd.nxt, c->snd.una), seglen);
646
647                 c->snd.nxt += seglen;
648                 left -= seglen;
649
650                 if(seglen && fin_wanted(c, c->snd.nxt)) {
651                         seglen--;
652                         pkt->hdr.ctl |= FIN;
653                 }
654
655                 if(!c->rtt_start.tv_sec) {
656                         // Start RTT measurement
657                         clock_gettime(UTCP_CLOCK, &c->rtt_start);
658                         c->rtt_seq = pkt->hdr.seq + seglen;
659                         debug(c, "starting RTT measurement, expecting ack %u\n", c->rtt_seq);
660                 }
661
662                 print_packet(c, "send", pkt, sizeof(pkt->hdr) + seglen);
663                 c->utcp->send(c->utcp, pkt, sizeof(pkt->hdr) + seglen);
664         } while(left);
665 }
666
667 ssize_t utcp_send(struct utcp_connection *c, const void *data, size_t len) {
668         if(c->reapable) {
669                 debug(c, "send() called on closed connection\n");
670                 errno = EBADF;
671                 return -1;
672         }
673
674         switch(c->state) {
675         case CLOSED:
676         case LISTEN:
677                 debug(c, "send() called on unconnected connection\n");
678                 errno = ENOTCONN;
679                 return -1;
680
681         case SYN_SENT:
682         case SYN_RECEIVED:
683         case ESTABLISHED:
684         case CLOSE_WAIT:
685                 break;
686
687         case FIN_WAIT_1:
688         case FIN_WAIT_2:
689         case CLOSING:
690         case LAST_ACK:
691         case TIME_WAIT:
692                 debug(c, "send() called on closed connection\n");
693                 errno = EPIPE;
694                 return -1;
695         }
696
697         // Exit early if we have nothing to send.
698
699         if(!len) {
700                 return 0;
701         }
702
703         if(!data) {
704                 errno = EFAULT;
705                 return -1;
706         }
707
708         // Check if we need to be able to buffer all data
709
710         if(c->flags & UTCP_NO_PARTIAL) {
711                 if(len > buffer_free(&c->sndbuf)) {
712                         if(len > c->sndbuf.maxsize) {
713                                 errno = EMSGSIZE;
714                                 return -1;
715                         } else {
716                                 errno = EWOULDBLOCK;
717                                 return 0;
718                         }
719                 }
720         }
721
722         // Add data to send buffer.
723
724         if(is_reliable(c) || (c->state != SYN_SENT && c->state != SYN_RECEIVED)) {
725                 len = buffer_put(&c->sndbuf, data, len);
726         } else {
727                 return 0;
728         }
729
730         if(len <= 0) {
731                 if(is_reliable(c)) {
732                         errno = EWOULDBLOCK;
733                         return 0;
734                 } else {
735                         return len;
736                 }
737         }
738
739         c->snd.last += len;
740
741         // Don't send anything yet if the connection has not fully established yet
742
743         if(c->state == SYN_SENT || c->state == SYN_RECEIVED) {
744                 return len;
745         }
746
747         ack(c, false);
748
749         if(!is_reliable(c)) {
750                 c->snd.una = c->snd.nxt = c->snd.last;
751                 buffer_discard(&c->sndbuf, c->sndbuf.used);
752         }
753
754         if(is_reliable(c) && !timespec_isset(&c->rtrx_timeout)) {
755                 start_retransmit_timer(c);
756         }
757
758         if(is_reliable(c) && !timespec_isset(&c->conn_timeout)) {
759                 clock_gettime(UTCP_CLOCK, &c->conn_timeout);
760                 c->conn_timeout.tv_sec += c->utcp->timeout;
761         }
762
763         return len;
764 }
765
766 static void swap_ports(struct hdr *hdr) {
767         uint16_t tmp = hdr->src;
768         hdr->src = hdr->dst;
769         hdr->dst = tmp;
770 }
771
772 static void fast_retransmit(struct utcp_connection *c) {
773         if(c->state == CLOSED || c->snd.last == c->snd.una) {
774                 debug(c, "fast_retransmit() called but nothing to retransmit!\n");
775                 return;
776         }
777
778         struct utcp *utcp = c->utcp;
779
780         struct {
781                 struct hdr hdr;
782                 uint8_t data[];
783         } *pkt;
784
785         pkt = malloc(c->utcp->mtu);
786
787         if(!pkt) {
788                 return;
789         }
790
791         pkt->hdr.src = c->src;
792         pkt->hdr.dst = c->dst;
793         pkt->hdr.wnd = c->rcvbuf.maxsize;
794         pkt->hdr.aux = 0;
795
796         switch(c->state) {
797         case ESTABLISHED:
798         case FIN_WAIT_1:
799         case CLOSE_WAIT:
800         case CLOSING:
801         case LAST_ACK:
802                 // Send unacked data again.
803                 pkt->hdr.seq = c->snd.una;
804                 pkt->hdr.ack = c->rcv.nxt;
805                 pkt->hdr.ctl = ACK;
806                 uint32_t len = min(seqdiff(c->snd.last, c->snd.una), utcp->mss);
807
808                 if(fin_wanted(c, c->snd.una + len)) {
809                         len--;
810                         pkt->hdr.ctl |= FIN;
811                 }
812
813                 buffer_copy(&c->sndbuf, pkt->data, 0, len);
814                 print_packet(c, "rtrx", pkt, sizeof(pkt->hdr) + len);
815                 utcp->send(utcp, pkt, sizeof(pkt->hdr) + len);
816                 break;
817
818         default:
819                 break;
820         }
821
822         free(pkt);
823 }
824
825 static void retransmit(struct utcp_connection *c) {
826         if(c->state == CLOSED || c->snd.last == c->snd.una) {
827                 debug(c, "retransmit() called but nothing to retransmit!\n");
828                 stop_retransmit_timer(c);
829                 return;
830         }
831
832         struct utcp *utcp = c->utcp;
833
834         struct {
835                 struct hdr hdr;
836                 uint8_t data[];
837         } *pkt = c->utcp->pkt;
838
839         pkt->hdr.src = c->src;
840         pkt->hdr.dst = c->dst;
841         pkt->hdr.wnd = c->rcvbuf.maxsize;
842         pkt->hdr.aux = 0;
843
844         switch(c->state) {
845         case SYN_SENT:
846                 // Send our SYN again
847                 pkt->hdr.seq = c->snd.iss;
848                 pkt->hdr.ack = 0;
849                 pkt->hdr.ctl = SYN;
850                 pkt->hdr.aux = 0x0101;
851                 pkt->data[0] = 1;
852                 pkt->data[1] = 0;
853                 pkt->data[2] = 0;
854                 pkt->data[3] = c->flags & 0x7;
855                 print_packet(c, "rtrx", pkt, sizeof(pkt->hdr) + 4);
856                 utcp->send(utcp, pkt, sizeof(pkt->hdr) + 4);
857                 break;
858
859         case SYN_RECEIVED:
860                 // Send SYNACK again
861                 pkt->hdr.seq = c->snd.nxt;
862                 pkt->hdr.ack = c->rcv.nxt;
863                 pkt->hdr.ctl = SYN | ACK;
864                 print_packet(c, "rtrx", pkt, sizeof(pkt->hdr));
865                 utcp->send(utcp, pkt, sizeof(pkt->hdr));
866                 break;
867
868         case ESTABLISHED:
869         case FIN_WAIT_1:
870         case CLOSE_WAIT:
871         case CLOSING:
872         case LAST_ACK:
873                 // Send unacked data again.
874                 pkt->hdr.seq = c->snd.una;
875                 pkt->hdr.ack = c->rcv.nxt;
876                 pkt->hdr.ctl = ACK;
877                 uint32_t len = min(seqdiff(c->snd.last, c->snd.una), utcp->mss);
878
879                 if(fin_wanted(c, c->snd.una + len)) {
880                         len--;
881                         pkt->hdr.ctl |= FIN;
882                 }
883
884                 // RFC 5681 slow start after timeout
885                 uint32_t flightsize = seqdiff(c->snd.nxt, c->snd.una);
886                 c->snd.ssthresh = max(flightsize / 2, utcp->mss * 2); // eq. 4
887                 c->snd.cwnd = utcp->mss;
888                 debug_cwnd(c);
889
890                 buffer_copy(&c->sndbuf, pkt->data, 0, len);
891                 print_packet(c, "rtrx", pkt, sizeof(pkt->hdr) + len);
892                 utcp->send(utcp, pkt, sizeof(pkt->hdr) + len);
893
894                 c->snd.nxt = c->snd.una + len;
895                 break;
896
897         case CLOSED:
898         case LISTEN:
899         case TIME_WAIT:
900         case FIN_WAIT_2:
901                 // We shouldn't need to retransmit anything in this state.
902 #ifdef UTCP_DEBUG
903                 abort();
904 #endif
905                 stop_retransmit_timer(c);
906                 goto cleanup;
907         }
908
909         start_retransmit_timer(c);
910         utcp->rto *= 2;
911
912         if(utcp->rto > MAX_RTO) {
913                 utcp->rto = MAX_RTO;
914         }
915
916         c->rtt_start.tv_sec = 0; // invalidate RTT timer
917         c->dupack = 0; // cancel any ongoing fast recovery
918
919 cleanup:
920         return;
921 }
922
923 /* Update receive buffer and SACK entries after consuming data.
924  *
925  * Situation:
926  *
927  * |.....0000..1111111111.....22222......3333|
928  * |---------------^
929  *
930  * 0..3 represent the SACK entries. The ^ indicates up to which point we want
931  * to remove data from the receive buffer. The idea is to substract "len"
932  * from the offset of all the SACK entries, and then remove/cut down entries
933  * that are shifted to before the start of the receive buffer.
934  *
935  * There are three cases:
936  * - the SACK entry is after ^, in that case just change the offset.
937  * - the SACK entry starts before and ends after ^, so we have to
938  *   change both its offset and size.
939  * - the SACK entry is completely before ^, in that case delete it.
940  */
941 static void sack_consume(struct utcp_connection *c, size_t len) {
942         debug(c, "sack_consume %lu\n", (unsigned long)len);
943
944         if(len > c->rcvbuf.used) {
945                 debug(c, "all SACK entries consumed\n");
946                 c->sacks[0].len = 0;
947                 return;
948         }
949
950         buffer_discard(&c->rcvbuf, len);
951
952         for(int i = 0; i < NSACKS && c->sacks[i].len;) {
953                 if(len < c->sacks[i].offset) {
954                         c->sacks[i].offset -= len;
955                         i++;
956                 } else if(len < c->sacks[i].offset + c->sacks[i].len) {
957                         c->sacks[i].len -= len - c->sacks[i].offset;
958                         c->sacks[i].offset = 0;
959                         i++;
960                 } else {
961                         if(i < NSACKS - 1) {
962                                 memmove(&c->sacks[i], &c->sacks[i + 1], (NSACKS - 1 - i) * sizeof(c->sacks)[i]);
963                                 c->sacks[NSACKS - 1].len = 0;
964                         } else {
965                                 c->sacks[i].len = 0;
966                                 break;
967                         }
968                 }
969         }
970
971         for(int i = 0; i < NSACKS && c->sacks[i].len; i++) {
972                 debug(c, "SACK[%d] offset %u len %u\n", i, c->sacks[i].offset, c->sacks[i].len);
973         }
974 }
975
976 static void handle_out_of_order(struct utcp_connection *c, uint32_t offset, const void *data, size_t len) {
977         debug(c, "out of order packet, offset %u\n", offset);
978         // Packet loss or reordering occured. Store the data in the buffer.
979         ssize_t rxd = buffer_put_at(&c->rcvbuf, offset, data, len);
980
981         if(rxd < 0 || (size_t)rxd < len) {
982                 abort();
983         }
984
985         // Make note of where we put it.
986         for(int i = 0; i < NSACKS; i++) {
987                 if(!c->sacks[i].len) { // nothing to merge, add new entry
988                         debug(c, "new SACK entry %d\n", i);
989                         c->sacks[i].offset = offset;
990                         c->sacks[i].len = rxd;
991                         break;
992                 } else if(offset < c->sacks[i].offset) {
993                         if(offset + rxd < c->sacks[i].offset) { // insert before
994                                 if(!c->sacks[NSACKS - 1].len) { // only if room left
995                                         debug(c, "insert SACK entry at %d\n", i);
996                                         memmove(&c->sacks[i + 1], &c->sacks[i], (NSACKS - i - 1) * sizeof(c->sacks)[i]);
997                                         c->sacks[i].offset = offset;
998                                         c->sacks[i].len = rxd;
999                                 } else {
1000                                         debug(c, "SACK entries full, dropping packet\n");
1001                                 }
1002
1003                                 break;
1004                         } else { // merge
1005                                 debug(c, "merge with start of SACK entry at %d\n", i);
1006                                 c->sacks[i].offset = offset;
1007                                 break;
1008                         }
1009                 } else if(offset <= c->sacks[i].offset + c->sacks[i].len) {
1010                         if(offset + rxd > c->sacks[i].offset + c->sacks[i].len) { // merge
1011                                 debug(c, "merge with end of SACK entry at %d\n", i);
1012                                 c->sacks[i].len = offset + rxd - c->sacks[i].offset;
1013                                 // TODO: handle potential merge with next entry
1014                         }
1015
1016                         break;
1017                 }
1018         }
1019
1020         for(int i = 0; i < NSACKS && c->sacks[i].len; i++) {
1021                 debug(c, "SACK[%d] offset %u len %u\n", i, c->sacks[i].offset, c->sacks[i].len);
1022         }
1023 }
1024
1025 static void handle_in_order(struct utcp_connection *c, const void *data, size_t len) {
1026         // Check if we can process out-of-order data now.
1027         if(c->sacks[0].len && len >= c->sacks[0].offset) { // TODO: handle overlap with second SACK
1028                 debug(c, "incoming packet len %lu connected with SACK at %u\n", (unsigned long)len, c->sacks[0].offset);
1029                 buffer_put_at(&c->rcvbuf, 0, data, len); // TODO: handle return value
1030                 len = max(len, c->sacks[0].offset + c->sacks[0].len);
1031                 data = c->rcvbuf.data;
1032         }
1033
1034         if(c->recv) {
1035                 ssize_t rxd = c->recv(c, data, len);
1036
1037                 if(rxd < 0 || (size_t)rxd != len) {
1038                         // TODO: handle the application not accepting all data.
1039                         abort();
1040                 }
1041         }
1042
1043         if(c->rcvbuf.used) {
1044                 sack_consume(c, len);
1045         }
1046
1047         c->rcv.nxt += len;
1048 }
1049
1050
1051 static void handle_incoming_data(struct utcp_connection *c, uint32_t seq, const void *data, size_t len) {
1052         if(!is_reliable(c)) {
1053                 c->recv(c, data, len);
1054                 c->rcv.nxt = seq + len;
1055                 return;
1056         }
1057
1058         uint32_t offset = seqdiff(seq, c->rcv.nxt);
1059
1060         if(offset + len > c->rcvbuf.maxsize) {
1061                 abort();
1062         }
1063
1064         if(offset) {
1065                 handle_out_of_order(c, offset, data, len);
1066         } else {
1067                 handle_in_order(c, data, len);
1068         }
1069 }
1070
1071
1072 ssize_t utcp_recv(struct utcp *utcp, const void *data, size_t len) {
1073         const uint8_t *ptr = data;
1074
1075         if(!utcp) {
1076                 errno = EFAULT;
1077                 return -1;
1078         }
1079
1080         if(!len) {
1081                 return 0;
1082         }
1083
1084         if(!data) {
1085                 errno = EFAULT;
1086                 return -1;
1087         }
1088
1089         // Drop packets smaller than the header
1090
1091         struct hdr hdr;
1092
1093         if(len < sizeof(hdr)) {
1094                 print_packet(NULL, "recv", data, len);
1095                 errno = EBADMSG;
1096                 return -1;
1097         }
1098
1099         // Make a copy from the potentially unaligned data to a struct hdr
1100
1101         memcpy(&hdr, ptr, sizeof(hdr));
1102
1103         // Try to match the packet to an existing connection
1104
1105         struct utcp_connection *c = find_connection(utcp, hdr.dst, hdr.src);
1106         print_packet(c, "recv", data, len);
1107
1108         // Process the header
1109
1110         ptr += sizeof(hdr);
1111         len -= sizeof(hdr);
1112
1113         // Drop packets with an unknown CTL flag
1114
1115         if(hdr.ctl & ~(SYN | ACK | RST | FIN)) {
1116                 print_packet(NULL, "recv", data, len);
1117                 errno = EBADMSG;
1118                 return -1;
1119         }
1120
1121         // Check for auxiliary headers
1122
1123         const uint8_t *init = NULL;
1124
1125         uint16_t aux = hdr.aux;
1126
1127         while(aux) {
1128                 size_t auxlen = 4 * (aux >> 8) & 0xf;
1129                 uint8_t auxtype = aux & 0xff;
1130
1131                 if(len < auxlen) {
1132                         errno = EBADMSG;
1133                         return -1;
1134                 }
1135
1136                 switch(auxtype) {
1137                 case AUX_INIT:
1138                         if(!(hdr.ctl & SYN) || auxlen != 4) {
1139                                 errno = EBADMSG;
1140                                 return -1;
1141                         }
1142
1143                         init = ptr;
1144                         break;
1145
1146                 default:
1147                         errno = EBADMSG;
1148                         return -1;
1149                 }
1150
1151                 len -= auxlen;
1152                 ptr += auxlen;
1153
1154                 if(!(aux & 0x800)) {
1155                         break;
1156                 }
1157
1158                 if(len < 2) {
1159                         errno = EBADMSG;
1160                         return -1;
1161                 }
1162
1163                 memcpy(&aux, ptr, 2);
1164                 len -= 2;
1165                 ptr += 2;
1166         }
1167
1168         bool has_data = len || (hdr.ctl & (SYN | FIN));
1169
1170         // Is it for a new connection?
1171
1172         if(!c) {
1173                 // Ignore RST packets
1174
1175                 if(hdr.ctl & RST) {
1176                         return 0;
1177                 }
1178
1179                 // Is it a SYN packet and are we LISTENing?
1180
1181                 if(hdr.ctl & SYN && !(hdr.ctl & ACK) && utcp->accept) {
1182                         // If we don't want to accept it, send a RST back
1183                         if((utcp->pre_accept && !utcp->pre_accept(utcp, hdr.dst))) {
1184                                 len = 1;
1185                                 goto reset;
1186                         }
1187
1188                         // Try to allocate memory, otherwise send a RST back
1189                         c = allocate_connection(utcp, hdr.dst, hdr.src);
1190
1191                         if(!c) {
1192                                 len = 1;
1193                                 goto reset;
1194                         }
1195
1196                         // Parse auxilliary information
1197                         if(init) {
1198                                 if(init[0] < 1) {
1199                                         len = 1;
1200                                         goto reset;
1201                                 }
1202
1203                                 c->flags = init[3] & 0x7;
1204                         } else {
1205                                 c->flags = UTCP_TCP;
1206                         }
1207
1208 synack:
1209                         // Return SYN+ACK, go to SYN_RECEIVED state
1210                         c->snd.wnd = hdr.wnd;
1211                         c->rcv.irs = hdr.seq;
1212                         c->rcv.nxt = c->rcv.irs + 1;
1213                         set_state(c, SYN_RECEIVED);
1214
1215                         struct {
1216                                 struct hdr hdr;
1217                                 uint8_t data[4];
1218                         } pkt;
1219
1220                         pkt.hdr.src = c->src;
1221                         pkt.hdr.dst = c->dst;
1222                         pkt.hdr.ack = c->rcv.irs + 1;
1223                         pkt.hdr.seq = c->snd.iss;
1224                         pkt.hdr.wnd = c->rcvbuf.maxsize;
1225                         pkt.hdr.ctl = SYN | ACK;
1226
1227                         if(init) {
1228                                 pkt.hdr.aux = 0x0101;
1229                                 pkt.data[0] = 1;
1230                                 pkt.data[1] = 0;
1231                                 pkt.data[2] = 0;
1232                                 pkt.data[3] = c->flags & 0x7;
1233                                 print_packet(c, "send", &pkt, sizeof(hdr) + 4);
1234                                 utcp->send(utcp, &pkt, sizeof(hdr) + 4);
1235                         } else {
1236                                 pkt.hdr.aux = 0;
1237                                 print_packet(c, "send", &pkt, sizeof(hdr));
1238                                 utcp->send(utcp, &pkt, sizeof(hdr));
1239                         }
1240                 } else {
1241                         // No, we don't want your packets, send a RST back
1242                         len = 1;
1243                         goto reset;
1244                 }
1245
1246                 return 0;
1247         }
1248
1249         debug(c, "state %s\n", strstate[c->state]);
1250
1251         // In case this is for a CLOSED connection, ignore the packet.
1252         // TODO: make it so incoming packets can never match a CLOSED connection.
1253
1254         if(c->state == CLOSED) {
1255                 debug(c, "got packet for closed connection\n");
1256                 return 0;
1257         }
1258
1259         // It is for an existing connection.
1260
1261         // 1. Drop invalid packets.
1262
1263         // 1a. Drop packets that should not happen in our current state.
1264
1265         switch(c->state) {
1266         case SYN_SENT:
1267         case SYN_RECEIVED:
1268         case ESTABLISHED:
1269         case FIN_WAIT_1:
1270         case FIN_WAIT_2:
1271         case CLOSE_WAIT:
1272         case CLOSING:
1273         case LAST_ACK:
1274         case TIME_WAIT:
1275                 break;
1276
1277         default:
1278 #ifdef UTCP_DEBUG
1279                 abort();
1280 #endif
1281                 break;
1282         }
1283
1284         // 1b. Discard data that is not in our receive window.
1285
1286         if(is_reliable(c)) {
1287                 bool acceptable;
1288
1289                 if(c->state == SYN_SENT) {
1290                         acceptable = true;
1291                 } else if(len == 0) {
1292                         acceptable = seqdiff(hdr.seq, c->rcv.nxt) >= 0;
1293                 } else {
1294                         int32_t rcv_offset = seqdiff(hdr.seq, c->rcv.nxt);
1295
1296                         // cut already accepted front overlapping
1297                         if(rcv_offset < 0) {
1298                                 acceptable = len > (size_t) - rcv_offset;
1299
1300                                 if(acceptable) {
1301                                         ptr -= rcv_offset;
1302                                         len += rcv_offset;
1303                                         hdr.seq -= rcv_offset;
1304                                 }
1305                         } else {
1306                                 acceptable = seqdiff(hdr.seq, c->rcv.nxt) >= 0 && seqdiff(hdr.seq, c->rcv.nxt) + len <= c->rcvbuf.maxsize;
1307                         }
1308                 }
1309
1310                 if(!acceptable) {
1311                         debug(c, "packet not acceptable, %u <= %u + %lu < %u\n", c->rcv.nxt, hdr.seq, (unsigned long)len, c->rcv.nxt + c->rcvbuf.maxsize);
1312
1313                         // Ignore unacceptable RST packets.
1314                         if(hdr.ctl & RST) {
1315                                 return 0;
1316                         }
1317
1318                         // Otherwise, continue processing.
1319                         len = 0;
1320                 }
1321         } else {
1322 #if UTCP_DEBUG
1323                 int32_t rcv_offset = seqdiff(hdr.seq, c->rcv.nxt);
1324
1325                 if(rcv_offset) {
1326                         debug(c, "packet out of order, offset %u bytes", rcv_offset);
1327                 }
1328
1329                 if(rcv_offset >= 0) {
1330                         c->rcv.nxt = hdr.seq + len;
1331                 }
1332
1333 #endif
1334         }
1335
1336         c->snd.wnd = hdr.wnd; // TODO: move below
1337
1338         // 1c. Drop packets with an invalid ACK.
1339         // ackno should not roll back, and it should also not be bigger than what we ever could have sent
1340         // (= snd.una + c->sndbuf.used).
1341
1342         if(!is_reliable(c)) {
1343                 if(hdr.ack != c->snd.last && c->state >= ESTABLISHED) {
1344                         hdr.ack = c->snd.una;
1345                 }
1346         }
1347
1348         if(hdr.ctl & ACK && (seqdiff(hdr.ack, c->snd.last) > 0 || seqdiff(hdr.ack, c->snd.una) < 0)) {
1349                 debug(c, "packet ack seqno out of range, %u <= %u < %u\n", c->snd.una, hdr.ack, c->snd.una + c->sndbuf.used);
1350
1351                 // Ignore unacceptable RST packets.
1352                 if(hdr.ctl & RST) {
1353                         return 0;
1354                 }
1355
1356                 goto reset;
1357         }
1358
1359         // 2. Handle RST packets
1360
1361         if(hdr.ctl & RST) {
1362                 switch(c->state) {
1363                 case SYN_SENT:
1364                         if(!(hdr.ctl & ACK)) {
1365                                 return 0;
1366                         }
1367
1368                         // The peer has refused our connection.
1369                         set_state(c, CLOSED);
1370                         errno = ECONNREFUSED;
1371
1372                         if(c->recv) {
1373                                 c->recv(c, NULL, 0);
1374                         }
1375
1376                         if(c->poll && !c->reapable) {
1377                                 c->poll(c, 0);
1378                         }
1379
1380                         return 0;
1381
1382                 case SYN_RECEIVED:
1383                         if(hdr.ctl & ACK) {
1384                                 return 0;
1385                         }
1386
1387                         // We haven't told the application about this connection yet. Silently delete.
1388                         free_connection(c);
1389                         return 0;
1390
1391                 case ESTABLISHED:
1392                 case FIN_WAIT_1:
1393                 case FIN_WAIT_2:
1394                 case CLOSE_WAIT:
1395                         if(hdr.ctl & ACK) {
1396                                 return 0;
1397                         }
1398
1399                         // The peer has aborted our connection.
1400                         set_state(c, CLOSED);
1401                         errno = ECONNRESET;
1402
1403                         if(c->recv) {
1404                                 c->recv(c, NULL, 0);
1405                         }
1406
1407                         if(c->poll && !c->reapable) {
1408                                 c->poll(c, 0);
1409                         }
1410
1411                         return 0;
1412
1413                 case CLOSING:
1414                 case LAST_ACK:
1415                 case TIME_WAIT:
1416                         if(hdr.ctl & ACK) {
1417                                 return 0;
1418                         }
1419
1420                         // As far as the application is concerned, the connection has already been closed.
1421                         // If it has called utcp_close() already, we can immediately free this connection.
1422                         if(c->reapable) {
1423                                 free_connection(c);
1424                                 return 0;
1425                         }
1426
1427                         // Otherwise, immediately move to the CLOSED state.
1428                         set_state(c, CLOSED);
1429                         return 0;
1430
1431                 default:
1432 #ifdef UTCP_DEBUG
1433                         abort();
1434 #endif
1435                         break;
1436                 }
1437         }
1438
1439         uint32_t advanced;
1440
1441         if(!(hdr.ctl & ACK)) {
1442                 advanced = 0;
1443                 goto skip_ack;
1444         }
1445
1446         // 3. Advance snd.una
1447
1448         advanced = seqdiff(hdr.ack, c->snd.una);
1449
1450         if(advanced) {
1451                 // RTT measurement
1452                 if(c->rtt_start.tv_sec) {
1453                         if(c->rtt_seq == hdr.ack) {
1454                                 struct timespec now;
1455                                 clock_gettime(UTCP_CLOCK, &now);
1456                                 int32_t diff = timespec_diff_usec(&now, &c->rtt_start);
1457                                 update_rtt(c, diff);
1458                                 c->rtt_start.tv_sec = 0;
1459                         } else if(c->rtt_seq < hdr.ack) {
1460                                 debug(c, "cancelling RTT measurement: %u < %u\n", c->rtt_seq, hdr.ack);
1461                                 c->rtt_start.tv_sec = 0;
1462                         }
1463                 }
1464
1465                 int32_t data_acked = advanced;
1466
1467                 switch(c->state) {
1468                 case SYN_SENT:
1469                 case SYN_RECEIVED:
1470                         data_acked--;
1471                         break;
1472
1473                 // TODO: handle FIN as well.
1474                 default:
1475                         break;
1476                 }
1477
1478                 assert(data_acked >= 0);
1479
1480 #ifndef NDEBUG
1481                 int32_t bufused = seqdiff(c->snd.last, c->snd.una);
1482                 assert(data_acked <= bufused);
1483 #endif
1484
1485                 if(data_acked) {
1486                         buffer_discard(&c->sndbuf, data_acked);
1487                 }
1488
1489                 // Also advance snd.nxt if possible
1490                 if(seqdiff(c->snd.nxt, hdr.ack) < 0) {
1491                         c->snd.nxt = hdr.ack;
1492                 }
1493
1494                 c->snd.una = hdr.ack;
1495
1496                 if(c->dupack) {
1497                         if(c->dupack >= 3) {
1498                                 debug(c, "fast recovery ended\n");
1499                                 c->snd.cwnd = c->snd.ssthresh;
1500                         }
1501
1502                         c->dupack = 0;
1503                 }
1504
1505                 // Increase the congestion window according to RFC 5681
1506                 if(c->snd.cwnd < c->snd.ssthresh) {
1507                         c->snd.cwnd += min(advanced, utcp->mss); // eq. 2
1508                 } else {
1509                         c->snd.cwnd += max(1, (utcp->mss * utcp->mss) / c->snd.cwnd); // eq. 3
1510                 }
1511
1512                 if(c->snd.cwnd > c->sndbuf.maxsize) {
1513                         c->snd.cwnd = c->sndbuf.maxsize;
1514                 }
1515
1516                 debug_cwnd(c);
1517
1518                 // Check if we have sent a FIN that is now ACKed.
1519                 switch(c->state) {
1520                 case FIN_WAIT_1:
1521                         if(c->snd.una == c->snd.last) {
1522                                 set_state(c, FIN_WAIT_2);
1523                         }
1524
1525                         break;
1526
1527                 case CLOSING:
1528                         if(c->snd.una == c->snd.last) {
1529                                 clock_gettime(UTCP_CLOCK, &c->conn_timeout);
1530                                 c->conn_timeout.tv_sec += utcp->timeout;
1531                                 set_state(c, TIME_WAIT);
1532                         }
1533
1534                         break;
1535
1536                 default:
1537                         break;
1538                 }
1539         } else {
1540                 if(!len && is_reliable(c) && c->snd.una != c->snd.last) {
1541                         c->dupack++;
1542                         debug(c, "duplicate ACK %d\n", c->dupack);
1543
1544                         if(c->dupack == 3) {
1545                                 // RFC 5681 fast recovery
1546                                 debug(c, "fast recovery started\n", c->dupack);
1547                                 uint32_t flightsize = seqdiff(c->snd.nxt, c->snd.una);
1548                                 c->snd.ssthresh = max(flightsize / 2, utcp->mss * 2); // eq. 4
1549                                 c->snd.cwnd = min(c->snd.ssthresh + 3 * utcp->mss, c->sndbuf.maxsize);
1550
1551                                 if(c->snd.cwnd > c->sndbuf.maxsize) {
1552                                         c->snd.cwnd = c->sndbuf.maxsize;
1553                                 }
1554
1555                                 debug_cwnd(c);
1556
1557                                 fast_retransmit(c);
1558                         } else if(c->dupack > 3) {
1559                                 c->snd.cwnd += utcp->mss;
1560
1561                                 if(c->snd.cwnd > c->sndbuf.maxsize) {
1562                                         c->snd.cwnd = c->sndbuf.maxsize;
1563                                 }
1564
1565                                 debug_cwnd(c);
1566                         }
1567
1568                         // We got an ACK which indicates the other side did get one of our packets.
1569                         // Reset the retransmission timer to avoid going to slow start,
1570                         // but don't touch the connection timeout.
1571                         start_retransmit_timer(c);
1572                 }
1573         }
1574
1575         // 4. Update timers
1576
1577         if(advanced) {
1578                 if(c->snd.una == c->snd.last) {
1579                         stop_retransmit_timer(c);
1580                         timespec_clear(&c->conn_timeout);
1581                 } else if(is_reliable(c)) {
1582                         start_retransmit_timer(c);
1583                         clock_gettime(UTCP_CLOCK, &c->conn_timeout);
1584                         c->conn_timeout.tv_sec += utcp->timeout;
1585                 }
1586         }
1587
1588 skip_ack:
1589         // 5. Process SYN stuff
1590
1591         if(hdr.ctl & SYN) {
1592                 switch(c->state) {
1593                 case SYN_SENT:
1594
1595                         // This is a SYNACK. It should always have ACKed the SYN.
1596                         if(!advanced) {
1597                                 goto reset;
1598                         }
1599
1600                         c->rcv.irs = hdr.seq;
1601                         c->rcv.nxt = hdr.seq;
1602
1603                         if(c->shut_wr) {
1604                                 c->snd.last++;
1605                                 set_state(c, FIN_WAIT_1);
1606                         } else {
1607                                 set_state(c, ESTABLISHED);
1608                         }
1609
1610                         // TODO: notify application of this somehow.
1611                         break;
1612
1613                 case SYN_RECEIVED:
1614                         // This is a retransmit of a SYN, send back the SYNACK.
1615                         goto synack;
1616
1617                 case ESTABLISHED:
1618                 case FIN_WAIT_1:
1619                 case FIN_WAIT_2:
1620                 case CLOSE_WAIT:
1621                 case CLOSING:
1622                 case LAST_ACK:
1623                 case TIME_WAIT:
1624                         // Ehm, no. We should never receive a second SYN.
1625                         return 0;
1626
1627                 default:
1628 #ifdef UTCP_DEBUG
1629                         abort();
1630 #endif
1631                         return 0;
1632                 }
1633
1634                 // SYN counts as one sequence number
1635                 c->rcv.nxt++;
1636         }
1637
1638         // 6. Process new data
1639
1640         if(c->state == SYN_RECEIVED) {
1641                 // This is the ACK after the SYNACK. It should always have ACKed the SYNACK.
1642                 if(!advanced) {
1643                         goto reset;
1644                 }
1645
1646                 // Are we still LISTENing?
1647                 if(utcp->accept) {
1648                         utcp->accept(c, c->src);
1649                 }
1650
1651                 if(c->state != ESTABLISHED) {
1652                         set_state(c, CLOSED);
1653                         c->reapable = true;
1654                         goto reset;
1655                 }
1656         }
1657
1658         if(len) {
1659                 switch(c->state) {
1660                 case SYN_SENT:
1661                 case SYN_RECEIVED:
1662                         // This should never happen.
1663 #ifdef UTCP_DEBUG
1664                         abort();
1665 #endif
1666                         return 0;
1667
1668                 case ESTABLISHED:
1669                 case FIN_WAIT_1:
1670                 case FIN_WAIT_2:
1671                         break;
1672
1673                 case CLOSE_WAIT:
1674                 case CLOSING:
1675                 case LAST_ACK:
1676                 case TIME_WAIT:
1677                         // Ehm no, We should never receive more data after a FIN.
1678                         goto reset;
1679
1680                 default:
1681 #ifdef UTCP_DEBUG
1682                         abort();
1683 #endif
1684                         return 0;
1685                 }
1686
1687                 handle_incoming_data(c, hdr.seq, ptr, len);
1688         }
1689
1690         // 7. Process FIN stuff
1691
1692         if((hdr.ctl & FIN) && (!is_reliable(c) || hdr.seq + len == c->rcv.nxt)) {
1693                 switch(c->state) {
1694                 case SYN_SENT:
1695                 case SYN_RECEIVED:
1696                         // This should never happen.
1697 #ifdef UTCP_DEBUG
1698                         abort();
1699 #endif
1700                         break;
1701
1702                 case ESTABLISHED:
1703                         set_state(c, CLOSE_WAIT);
1704                         break;
1705
1706                 case FIN_WAIT_1:
1707                         set_state(c, CLOSING);
1708                         break;
1709
1710                 case FIN_WAIT_2:
1711                         clock_gettime(UTCP_CLOCK, &c->conn_timeout);
1712                         c->conn_timeout.tv_sec += utcp->timeout;
1713                         set_state(c, TIME_WAIT);
1714                         break;
1715
1716                 case CLOSE_WAIT:
1717                 case CLOSING:
1718                 case LAST_ACK:
1719                 case TIME_WAIT:
1720                         // Ehm, no. We should never receive a second FIN.
1721                         goto reset;
1722
1723                 default:
1724 #ifdef UTCP_DEBUG
1725                         abort();
1726 #endif
1727                         break;
1728                 }
1729
1730                 // FIN counts as one sequence number
1731                 c->rcv.nxt++;
1732                 len++;
1733
1734                 // Inform the application that the peer closed its end of the connection.
1735                 if(c->recv) {
1736                         errno = 0;
1737                         c->recv(c, NULL, 0);
1738                 }
1739         }
1740
1741         // Now we send something back if:
1742         // - we received data, so we have to send back an ACK
1743         //   -> sendatleastone = true
1744         // - or we got an ack, so we should maybe send a bit more data
1745         //   -> sendatleastone = false
1746
1747         if(is_reliable(c) || hdr.ctl & SYN || hdr.ctl & FIN) {
1748                 ack(c, has_data);
1749         }
1750
1751         return 0;
1752
1753 reset:
1754         swap_ports(&hdr);
1755         hdr.wnd = 0;
1756         hdr.aux = 0;
1757
1758         if(hdr.ctl & ACK) {
1759                 hdr.seq = hdr.ack;
1760                 hdr.ctl = RST;
1761         } else {
1762                 hdr.ack = hdr.seq + len;
1763                 hdr.seq = 0;
1764                 hdr.ctl = RST | ACK;
1765         }
1766
1767         print_packet(c, "send", &hdr, sizeof(hdr));
1768         utcp->send(utcp, &hdr, sizeof(hdr));
1769         return 0;
1770
1771 }
1772
1773 int utcp_shutdown(struct utcp_connection *c, int dir) {
1774         debug(c, "shutdown %d at %u\n", dir, c ? c->snd.last : 0);
1775
1776         if(!c) {
1777                 errno = EFAULT;
1778                 return -1;
1779         }
1780
1781         if(c->reapable) {
1782                 debug(c, "shutdown() called on closed connection\n");
1783                 errno = EBADF;
1784                 return -1;
1785         }
1786
1787         if(!(dir == UTCP_SHUT_RD || dir == UTCP_SHUT_WR || dir == UTCP_SHUT_RDWR)) {
1788                 errno = EINVAL;
1789                 return -1;
1790         }
1791
1792         // TCP does not have a provision for stopping incoming packets.
1793         // The best we can do is to just ignore them.
1794         if(dir == UTCP_SHUT_RD || dir == UTCP_SHUT_RDWR) {
1795                 c->recv = NULL;
1796         }
1797
1798         // The rest of the code deals with shutting down writes.
1799         if(dir == UTCP_SHUT_RD) {
1800                 return 0;
1801         }
1802
1803         // Only process shutting down writes once.
1804         if(c->shut_wr) {
1805                 return 0;
1806         }
1807
1808         c->shut_wr = true;
1809
1810         switch(c->state) {
1811         case CLOSED:
1812         case LISTEN:
1813                 errno = ENOTCONN;
1814                 return -1;
1815
1816         case SYN_SENT:
1817                 return 0;
1818
1819         case SYN_RECEIVED:
1820         case ESTABLISHED:
1821                 set_state(c, FIN_WAIT_1);
1822                 break;
1823
1824         case FIN_WAIT_1:
1825         case FIN_WAIT_2:
1826                 return 0;
1827
1828         case CLOSE_WAIT:
1829                 set_state(c, CLOSING);
1830                 break;
1831
1832         case CLOSING:
1833         case LAST_ACK:
1834         case TIME_WAIT:
1835                 return 0;
1836         }
1837
1838         c->snd.last++;
1839
1840         ack(c, false);
1841
1842         if(!timespec_isset(&c->rtrx_timeout)) {
1843                 start_retransmit_timer(c);
1844         }
1845
1846         return 0;
1847 }
1848
1849 static bool reset_connection(struct utcp_connection *c) {
1850         if(!c) {
1851                 errno = EFAULT;
1852                 return false;
1853         }
1854
1855         if(c->reapable) {
1856                 debug(c, "abort() called on closed connection\n");
1857                 errno = EBADF;
1858                 return false;
1859         }
1860
1861         c->recv = NULL;
1862         c->poll = NULL;
1863
1864         switch(c->state) {
1865         case CLOSED:
1866                 return true;
1867
1868         case LISTEN:
1869         case SYN_SENT:
1870         case CLOSING:
1871         case LAST_ACK:
1872         case TIME_WAIT:
1873                 set_state(c, CLOSED);
1874                 return true;
1875
1876         case SYN_RECEIVED:
1877         case ESTABLISHED:
1878         case FIN_WAIT_1:
1879         case FIN_WAIT_2:
1880         case CLOSE_WAIT:
1881                 set_state(c, CLOSED);
1882                 break;
1883         }
1884
1885         // Send RST
1886
1887         struct hdr hdr;
1888
1889         hdr.src = c->src;
1890         hdr.dst = c->dst;
1891         hdr.seq = c->snd.nxt;
1892         hdr.ack = 0;
1893         hdr.wnd = 0;
1894         hdr.ctl = RST;
1895
1896         print_packet(c, "send", &hdr, sizeof(hdr));
1897         c->utcp->send(c->utcp, &hdr, sizeof(hdr));
1898         return true;
1899 }
1900
1901 // Closes all the opened connections
1902 void utcp_abort_all_connections(struct utcp *utcp) {
1903         if(!utcp) {
1904                 errno = EINVAL;
1905                 return;
1906         }
1907
1908         for(int i = 0; i < utcp->nconnections; i++) {
1909                 struct utcp_connection *c = utcp->connections[i];
1910
1911                 if(c->reapable || c->state == CLOSED) {
1912                         continue;
1913                 }
1914
1915                 utcp_recv_t old_recv = c->recv;
1916                 utcp_poll_t old_poll = c->poll;
1917
1918                 reset_connection(c);
1919
1920                 if(old_recv) {
1921                         errno = 0;
1922                         old_recv(c, NULL, 0);
1923                 }
1924
1925                 if(old_poll && !c->reapable) {
1926                         errno = 0;
1927                         old_poll(c, 0);
1928                 }
1929         }
1930
1931         return;
1932 }
1933
1934 int utcp_close(struct utcp_connection *c) {
1935         if(utcp_shutdown(c, SHUT_RDWR) && errno != ENOTCONN) {
1936                 return -1;
1937         }
1938
1939         c->recv = NULL;
1940         c->poll = NULL;
1941         c->reapable = true;
1942         return 0;
1943 }
1944
1945 int utcp_abort(struct utcp_connection *c) {
1946         if(!reset_connection(c)) {
1947                 return -1;
1948         }
1949
1950         c->reapable = true;
1951         return 0;
1952 }
1953
1954 /* Handle timeouts.
1955  * One call to this function will loop through all connections,
1956  * checking if something needs to be resent or not.
1957  * The return value is the time to the next timeout in milliseconds,
1958  * or maybe a negative value if the timeout is infinite.
1959  */
1960 struct timespec utcp_timeout(struct utcp *utcp) {
1961         struct timespec now;
1962         clock_gettime(UTCP_CLOCK, &now);
1963         struct timespec next = {now.tv_sec + 3600, now.tv_nsec};
1964
1965         for(int i = 0; i < utcp->nconnections; i++) {
1966                 struct utcp_connection *c = utcp->connections[i];
1967
1968                 if(!c) {
1969                         continue;
1970                 }
1971
1972                 // delete connections that have been utcp_close()d.
1973                 if(c->state == CLOSED) {
1974                         if(c->reapable) {
1975                                 debug(c, "reaping\n");
1976                                 free_connection(c);
1977                                 i--;
1978                         }
1979
1980                         continue;
1981                 }
1982
1983                 if(timespec_isset(&c->conn_timeout) && timespec_lt(&c->conn_timeout, &now)) {
1984                         errno = ETIMEDOUT;
1985                         c->state = CLOSED;
1986
1987                         if(c->recv) {
1988                                 c->recv(c, NULL, 0);
1989                         }
1990
1991                         if(c->poll && !c->reapable) {
1992                                 c->poll(c, 0);
1993                         }
1994
1995                         continue;
1996                 }
1997
1998                 if(timespec_isset(&c->rtrx_timeout) && timespec_lt(&c->rtrx_timeout, &now)) {
1999                         debug(c, "retransmitting after timeout\n");
2000                         retransmit(c);
2001                 }
2002
2003                 if(c->poll) {
2004                         if((c->state == ESTABLISHED || c->state == CLOSE_WAIT)) {
2005                                 uint32_t len =  buffer_free(&c->sndbuf);
2006
2007                                 if(len) {
2008                                         c->poll(c, len);
2009                                 }
2010                         } else if(c->state == CLOSED) {
2011                                 c->poll(c, 0);
2012                         }
2013                 }
2014
2015                 if(timespec_isset(&c->conn_timeout) && timespec_lt(&c->conn_timeout, &next)) {
2016                         next = c->conn_timeout;
2017                 }
2018
2019                 if(timespec_isset(&c->rtrx_timeout) && timespec_lt(&c->rtrx_timeout, &next)) {
2020                         next = c->rtrx_timeout;
2021                 }
2022         }
2023
2024         struct timespec diff;
2025
2026         timespec_sub(&next, &now, &diff);
2027
2028         return diff;
2029 }
2030
2031 bool utcp_is_active(struct utcp *utcp) {
2032         if(!utcp) {
2033                 return false;
2034         }
2035
2036         for(int i = 0; i < utcp->nconnections; i++)
2037                 if(utcp->connections[i]->state != CLOSED && utcp->connections[i]->state != TIME_WAIT) {
2038                         return true;
2039                 }
2040
2041         return false;
2042 }
2043
2044 struct utcp *utcp_init(utcp_accept_t accept, utcp_pre_accept_t pre_accept, utcp_send_t send, void *priv) {
2045         if(!send) {
2046                 errno = EFAULT;
2047                 return NULL;
2048         }
2049
2050         struct utcp *utcp = calloc(1, sizeof(*utcp));
2051
2052         if(!utcp) {
2053                 return NULL;
2054         }
2055
2056         if(!CLOCK_GRANULARITY) {
2057                 struct timespec res;
2058                 clock_getres(UTCP_CLOCK, &res);
2059                 CLOCK_GRANULARITY = res.tv_sec * USEC_PER_SEC + res.tv_nsec / 1000;
2060         }
2061
2062         utcp->accept = accept;
2063         utcp->pre_accept = pre_accept;
2064         utcp->send = send;
2065         utcp->priv = priv;
2066         utcp_set_mtu(utcp, DEFAULT_MTU);
2067         utcp->timeout = DEFAULT_USER_TIMEOUT; // sec
2068         utcp->rto = START_RTO; // usec
2069
2070         return utcp;
2071 }
2072
2073 void utcp_exit(struct utcp *utcp) {
2074         if(!utcp) {
2075                 return;
2076         }
2077
2078         for(int i = 0; i < utcp->nconnections; i++) {
2079                 struct utcp_connection *c = utcp->connections[i];
2080
2081                 if(!c->reapable) {
2082                         if(c->recv) {
2083                                 c->recv(c, NULL, 0);
2084                         }
2085
2086                         if(c->poll && !c->reapable) {
2087                                 c->poll(c, 0);
2088                         }
2089                 }
2090
2091                 buffer_exit(&c->rcvbuf);
2092                 buffer_exit(&c->sndbuf);
2093                 free(c);
2094         }
2095
2096         free(utcp->connections);
2097         free(utcp);
2098 }
2099
2100 uint16_t utcp_get_mtu(struct utcp *utcp) {
2101         return utcp ? utcp->mtu : 0;
2102 }
2103
2104 uint16_t utcp_get_mss(struct utcp *utcp) {
2105         return utcp ? utcp->mss : 0;
2106 }
2107
2108 void utcp_set_mtu(struct utcp *utcp, uint16_t mtu) {
2109         if(!utcp) {
2110                 return;
2111         }
2112
2113         if(mtu <= sizeof(struct hdr)) {
2114                 return;
2115         }
2116
2117         if(mtu > utcp->mtu) {
2118                 char *new = realloc(utcp->pkt, mtu + sizeof(struct hdr));
2119
2120                 if(!new) {
2121                         return;
2122                 }
2123
2124                 utcp->pkt = new;
2125         }
2126
2127         utcp->mtu = mtu;
2128         utcp->mss = mtu - sizeof(struct hdr);
2129 }
2130
2131 void utcp_reset_timers(struct utcp *utcp) {
2132         if(!utcp) {
2133                 return;
2134         }
2135
2136         struct timespec now, then;
2137
2138         clock_gettime(UTCP_CLOCK, &now);
2139
2140         then = now;
2141
2142         then.tv_sec += utcp->timeout;
2143
2144         for(int i = 0; i < utcp->nconnections; i++) {
2145                 struct utcp_connection *c = utcp->connections[i];
2146
2147                 if(c->reapable) {
2148                         continue;
2149                 }
2150
2151                 if(timespec_isset(&c->rtrx_timeout)) {
2152                         c->rtrx_timeout = now;
2153                 }
2154
2155                 if(timespec_isset(&c->conn_timeout)) {
2156                         c->conn_timeout = then;
2157                 }
2158
2159                 c->rtt_start.tv_sec = 0;
2160         }
2161
2162         if(utcp->rto > START_RTO) {
2163                 utcp->rto = START_RTO;
2164         }
2165 }
2166
2167 int utcp_get_user_timeout(struct utcp *u) {
2168         return u ? u->timeout : 0;
2169 }
2170
2171 void utcp_set_user_timeout(struct utcp *u, int timeout) {
2172         if(u) {
2173                 u->timeout = timeout;
2174         }
2175 }
2176
2177 size_t utcp_get_sndbuf(struct utcp_connection *c) {
2178         return c ? c->sndbuf.maxsize : 0;
2179 }
2180
2181 size_t utcp_get_sndbuf_free(struct utcp_connection *c) {
2182         if(!c) {
2183                 return 0;
2184         }
2185
2186         switch(c->state) {
2187         case SYN_SENT:
2188         case SYN_RECEIVED:
2189         case ESTABLISHED:
2190         case CLOSE_WAIT:
2191                 return buffer_free(&c->sndbuf);
2192
2193         default:
2194                 return 0;
2195         }
2196 }
2197
2198 void utcp_set_sndbuf(struct utcp_connection *c, size_t size) {
2199         if(!c) {
2200                 return;
2201         }
2202
2203         c->sndbuf.maxsize = size;
2204
2205         if(c->sndbuf.maxsize != size) {
2206                 c->sndbuf.maxsize = -1;
2207         }
2208 }
2209
2210 size_t utcp_get_rcvbuf(struct utcp_connection *c) {
2211         return c ? c->rcvbuf.maxsize : 0;
2212 }
2213
2214 size_t utcp_get_rcvbuf_free(struct utcp_connection *c) {
2215         if(c && (c->state == ESTABLISHED || c->state == CLOSE_WAIT)) {
2216                 return buffer_free(&c->rcvbuf);
2217         } else {
2218                 return 0;
2219         }
2220 }
2221
2222 void utcp_set_rcvbuf(struct utcp_connection *c, size_t size) {
2223         if(!c) {
2224                 return;
2225         }
2226
2227         c->rcvbuf.maxsize = size;
2228
2229         if(c->rcvbuf.maxsize != size) {
2230                 c->rcvbuf.maxsize = -1;
2231         }
2232 }
2233
2234 size_t utcp_get_sendq(struct utcp_connection *c) {
2235         return c->sndbuf.used;
2236 }
2237
2238 size_t utcp_get_recvq(struct utcp_connection *c) {
2239         return c->rcvbuf.used;
2240 }
2241
2242 bool utcp_get_nodelay(struct utcp_connection *c) {
2243         return c ? c->nodelay : false;
2244 }
2245
2246 void utcp_set_nodelay(struct utcp_connection *c, bool nodelay) {
2247         if(c) {
2248                 c->nodelay = nodelay;
2249         }
2250 }
2251
2252 bool utcp_get_keepalive(struct utcp_connection *c) {
2253         return c ? c->keepalive : false;
2254 }
2255
2256 void utcp_set_keepalive(struct utcp_connection *c, bool keepalive) {
2257         if(c) {
2258                 c->keepalive = keepalive;
2259         }
2260 }
2261
2262 size_t utcp_get_outq(struct utcp_connection *c) {
2263         return c ? seqdiff(c->snd.nxt, c->snd.una) : 0;
2264 }
2265
2266 void utcp_set_recv_cb(struct utcp_connection *c, utcp_recv_t recv) {
2267         if(c) {
2268                 c->recv = recv;
2269         }
2270 }
2271
2272 void utcp_set_poll_cb(struct utcp_connection *c, utcp_poll_t poll) {
2273         if(c) {
2274                 c->poll = poll;
2275         }
2276 }
2277
2278 void utcp_set_accept_cb(struct utcp *utcp, utcp_accept_t accept, utcp_pre_accept_t pre_accept) {
2279         if(utcp) {
2280                 utcp->accept = accept;
2281                 utcp->pre_accept = pre_accept;
2282         }
2283 }
2284
2285 void utcp_expect_data(struct utcp_connection *c, bool expect) {
2286         if(!c || c->reapable) {
2287                 return;
2288         }
2289
2290         if(!(c->state == ESTABLISHED || c->state == FIN_WAIT_1 || c->state == FIN_WAIT_2)) {
2291                 return;
2292         }
2293
2294         if(expect) {
2295                 // If we expect data, start the connection timer.
2296                 if(!timespec_isset(&c->conn_timeout)) {
2297                         clock_gettime(UTCP_CLOCK, &c->conn_timeout);
2298                         c->conn_timeout.tv_sec += c->utcp->timeout;
2299                 }
2300         } else {
2301                 // If we want to cancel expecting data, only clear the timer when there is no unACKed data.
2302                 if(c->snd.una == c->snd.last) {
2303                         timespec_clear(&c->conn_timeout);
2304                 }
2305         }
2306 }
2307
2308 void utcp_offline(struct utcp *utcp, bool offline) {
2309         struct timespec now;
2310         clock_gettime(UTCP_CLOCK, &now);
2311
2312         for(int i = 0; i < utcp->nconnections; i++) {
2313                 struct utcp_connection *c = utcp->connections[i];
2314
2315                 if(c->reapable) {
2316                         continue;
2317                 }
2318
2319                 utcp_expect_data(c, offline);
2320
2321                 if(!offline) {
2322                         if(timespec_isset(&c->rtrx_timeout)) {
2323                                 c->rtrx_timeout = now;
2324                         }
2325
2326                         utcp->connections[i]->rtt_start.tv_sec = 0;
2327                 }
2328         }
2329
2330         if(!offline && utcp->rto > START_RTO) {
2331                 utcp->rto = START_RTO;
2332         }
2333 }
2334
2335 void utcp_set_clock_granularity(long granularity) {
2336         CLOCK_GRANULARITY = granularity;
2337 }