]> git.meshlink.io Git - utcp/blob - utcp.c
Fix buffer resizing.
[utcp] / utcp.c
1 /*
2     utcp.c -- Userspace TCP
3     Copyright (C) 2014-2017 Guus Sliepen <guus@tinc-vpn.org>
4
5     This program is free software; you can redistribute it and/or modify
6     it under the terms of the GNU General Public License as published by
7     the Free Software Foundation; either version 2 of the License, or
8     (at your option) any later version.
9
10     This program is distributed in the hope that it will be useful,
11     but WITHOUT ANY WARRANTY; without even the implied warranty of
12     MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
13     GNU General Public License for more details.
14
15     You should have received a copy of the GNU General Public License along
16     with this program; if not, write to the Free Software Foundation, Inc.,
17     51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
18 */
19
20 #define _GNU_SOURCE
21
22 #include <assert.h>
23 #include <errno.h>
24 #include <stdio.h>
25 #include <stdlib.h>
26 #include <stdint.h>
27 #include <stdbool.h>
28 #include <string.h>
29 #include <unistd.h>
30 #include <time.h>
31
32 #include "utcp_priv.h"
33
34 #ifndef EBADMSG
35 #define EBADMSG         104
36 #endif
37
38 #ifndef SHUT_RDWR
39 #define SHUT_RDWR 2
40 #endif
41
42 #ifdef poll
43 #undef poll
44 #endif
45
46 #ifndef UTCP_CLOCK
47 #if defined(CLOCK_MONOTONIC_RAW) && defined(__x86_64__)
48 #define UTCP_CLOCK CLOCK_MONOTONIC_RAW
49 #else
50 #define UTCP_CLOCK CLOCK_MONOTONIC
51 #endif
52 #endif
53
54 static void timespec_sub(const struct timespec *a, const struct timespec *b, struct timespec *r) {
55         r->tv_sec = a->tv_sec - b->tv_sec;
56         r->tv_nsec = a->tv_nsec - b->tv_nsec;
57
58         if(r->tv_nsec < 0) {
59                 r->tv_sec--, r->tv_nsec += NSEC_PER_SEC;
60         }
61 }
62
63 static int32_t timespec_diff_usec(const struct timespec *a, const struct timespec *b) {
64         int64_t diff = (a->tv_sec - b->tv_sec) * 1000000000 + a->tv_sec - b->tv_sec;
65         return diff / 1000;
66 }
67
68 static bool timespec_lt(const struct timespec *a, const struct timespec *b) {
69         if(a->tv_sec == b->tv_sec) {
70                 return a->tv_nsec < b->tv_nsec;
71         } else {
72                 return a->tv_sec < b->tv_sec;
73         }
74 }
75
76 static void timespec_clear(struct timespec *a) {
77         a->tv_sec = 0;
78 }
79
80 static bool timespec_isset(const struct timespec *a) {
81         return a->tv_sec;
82 }
83
84 static long CLOCK_GRANULARITY; // usec
85
86 static inline size_t min(size_t a, size_t b) {
87         return a < b ? a : b;
88 }
89
90 static inline size_t max(size_t a, size_t b) {
91         return a > b ? a : b;
92 }
93
94 #ifdef UTCP_DEBUG
95 #include <stdarg.h>
96
97 #ifndef UTCP_DEBUG_DATALEN
98 #define UTCP_DEBUG_DATALEN 20
99 #endif
100
101 static void debug(struct utcp_connection *c, const char *format, ...) {
102         struct timespec tv;
103         char buf[1024];
104         int len;
105
106         clock_gettime(CLOCK_REALTIME, &tv);
107         len = snprintf(buf, sizeof(buf), "%ld.%06lu %u:%u ", (long)tv.tv_sec, tv.tv_nsec / 1000, c ? c->src : 0, c ? c->dst : 0);
108         va_list ap;
109         va_start(ap, format);
110         len += vsnprintf(buf + len, sizeof(buf) - len, format, ap);
111         va_end(ap);
112
113         if(len > 0 && (size_t)len < sizeof(buf)) {
114                 fwrite(buf, len, 1, stderr);
115         }
116 }
117
118 static void print_packet(struct utcp_connection *c, const char *dir, const void *pkt, size_t len) {
119         struct hdr hdr;
120
121         if(len < sizeof(hdr)) {
122                 debug(c, "%s: short packet (%lu bytes)\n", dir, (unsigned long)len);
123                 return;
124         }
125
126         memcpy(&hdr, pkt, sizeof(hdr));
127
128         uint32_t datalen;
129
130         if(len > sizeof(hdr)) {
131                 datalen = min(len - sizeof(hdr), UTCP_DEBUG_DATALEN);
132         } else {
133                 datalen = 0;
134         }
135
136
137         const uint8_t *data = (uint8_t *)pkt + sizeof(hdr);
138         char str[datalen * 2 + 1];
139         char *p = str;
140
141         for(uint32_t i = 0; i < datalen; i++) {
142                 *p++ = "0123456789ABCDEF"[data[i] >> 4];
143                 *p++ = "0123456789ABCDEF"[data[i] & 15];
144         }
145
146         *p = 0;
147
148         debug(c, "%s: len %lu src %u dst %u seq %u ack %u wnd %u aux %x ctl %s%s%s%s data %s\n",
149               dir, (unsigned long)len, hdr.src, hdr.dst, hdr.seq, hdr.ack, hdr.wnd, hdr.aux,
150               hdr.ctl & SYN ? "SYN" : "",
151               hdr.ctl & RST ? "RST" : "",
152               hdr.ctl & FIN ? "FIN" : "",
153               hdr.ctl & ACK ? "ACK" : "",
154               str
155              );
156 }
157
158 static void debug_cwnd(struct utcp_connection *c) {
159         debug(c, "snd.cwnd %u snd.ssthresh %u\n", c->snd.cwnd, ~c->snd.ssthresh ? c->snd.ssthresh : 0);
160 }
161 #else
162 #define debug(...) do {} while(0)
163 #define print_packet(...) do {} while(0)
164 #define debug_cwnd(...) do {} while(0)
165 #endif
166
167 static void set_state(struct utcp_connection *c, enum state state) {
168         c->state = state;
169
170         if(state == ESTABLISHED) {
171                 timespec_clear(&c->conn_timeout);
172         }
173
174         debug(c, "state %s\n", strstate[state]);
175 }
176
177 static bool fin_wanted(struct utcp_connection *c, uint32_t seq) {
178         if(seq != c->snd.last) {
179                 return false;
180         }
181
182         switch(c->state) {
183         case FIN_WAIT_1:
184         case CLOSING:
185         case LAST_ACK:
186                 return true;
187
188         default:
189                 return false;
190         }
191 }
192
193 static bool is_reliable(struct utcp_connection *c) {
194         return c->flags & UTCP_RELIABLE;
195 }
196
197 static int32_t seqdiff(uint32_t a, uint32_t b) {
198         return a - b;
199 }
200
201 // Buffer functions
202 static bool buffer_wraps(struct buffer *buf) {
203         return buf->size - buf->offset < buf->used;
204 }
205
206 static bool buffer_resize(struct buffer *buf, uint32_t newsize) {
207         char *newdata = realloc(buf->data, newsize);
208
209         if(!newdata) {
210                 return false;
211         }
212
213         buf->data = newdata;
214
215         if(buffer_wraps(buf)) {
216                 // Shift the right part of the buffer until it hits the end of the new buffer.
217                 // Old situation:
218                 // [345......012]
219                 // New situation:
220                 // [345.........|........012]
221                 uint32_t tailsize = buf->size - buf->offset;
222                 uint32_t newoffset = newsize - tailsize;
223                 memmove(buf->data + newoffset, buf->data + buf->offset, tailsize);
224                 buf->offset = newoffset;
225         }
226
227         buf->size = newsize;
228         return true;
229 }
230
231 // Store data into the buffer
232 static ssize_t buffer_put_at(struct buffer *buf, size_t offset, const void *data, size_t len) {
233         debug(NULL, "buffer_put_at %lu %lu %lu\n", (unsigned long)buf->used, (unsigned long)offset, (unsigned long)len);
234
235         // Ensure we don't store more than maxsize bytes in total
236         size_t required = offset + len;
237
238         if(required > buf->maxsize) {
239                 if(offset >= buf->maxsize) {
240                         return 0;
241                 }
242
243                 len = buf->maxsize - offset;
244                 required = buf->maxsize;
245         }
246
247         // Check if we need to resize the buffer
248         if(required > buf->size) {
249                 size_t newsize = buf->size;
250
251                 if(!newsize) {
252                         newsize = 4096;
253                 }
254
255                 do {
256                         newsize *= 2;
257                 } while(newsize < required);
258
259                 if(newsize > buf->maxsize) {
260                         newsize = buf->maxsize;
261                 }
262
263                 if(!buffer_resize(buf, newsize)) {
264                         return -1;
265                 }
266         }
267
268         uint32_t realoffset = buf->offset + offset;
269
270         if(buf->size - buf->offset < offset) {
271                 // The offset wrapped
272                 realoffset -= buf->size;
273         }
274
275         if(buf->size - realoffset < len) {
276                 // The new chunk of data must be wrapped
277                 memcpy(buf->data + realoffset, data, buf->size - realoffset);
278                 memcpy(buf->data, (char *)data + buf->size - realoffset, len - (buf->size - realoffset));
279         } else {
280                 memcpy(buf->data + realoffset, data, len);
281         }
282
283         if(required > buf->used) {
284                 buf->used = required;
285         }
286
287         return len;
288 }
289
290 static ssize_t buffer_put(struct buffer *buf, const void *data, size_t len) {
291         return buffer_put_at(buf, buf->used, data, len);
292 }
293
294 // Copy data from the buffer without removing it.
295 static ssize_t buffer_copy(struct buffer *buf, void *data, size_t offset, size_t len) {
296         // Ensure we don't copy more than is actually stored in the buffer
297         if(offset >= buf->used) {
298                 return 0;
299         }
300
301         if(buf->used - offset < len) {
302                 len = buf->used - offset;
303         }
304
305         uint32_t realoffset = buf->offset + offset;
306
307         if(buf->size - buf->offset < offset) {
308                 // The offset wrapped
309                 realoffset -= buf->size;
310         }
311
312         if(buf->size - realoffset < len) {
313                 // The data is wrapped
314                 memcpy(data, buf->data + realoffset, buf->size - realoffset);
315                 memcpy((char *)data + buf->size - realoffset, buf->data, len - (buf->size - realoffset));
316         } else {
317                 memcpy(data, buf->data + realoffset, len);
318         }
319
320         return len;
321 }
322
323 // Discard data from the buffer.
324 static ssize_t buffer_discard(struct buffer *buf, size_t len) {
325         if(buf->used < len) {
326                 len = buf->used;
327         }
328
329         if(buf->size - buf->offset < len) {
330                 buf->offset -= buf->size;
331         }
332
333         buf->offset += len;
334         buf->used -= len;
335
336         return len;
337 }
338
339 static bool buffer_set_size(struct buffer *buf, uint32_t minsize, uint32_t maxsize) {
340         if(maxsize < minsize) {
341                 maxsize = minsize;
342         }
343
344         buf->maxsize = maxsize;
345
346         return buf->size >= minsize || buffer_resize(buf, minsize);
347 }
348
349 static void buffer_exit(struct buffer *buf) {
350         free(buf->data);
351         memset(buf, 0, sizeof(*buf));
352 }
353
354 static uint32_t buffer_free(const struct buffer *buf) {
355         return buf->maxsize - buf->used;
356 }
357
358 // Connections are stored in a sorted list.
359 // This gives O(log(N)) lookup time, O(N log(N)) insertion time and O(N) deletion time.
360
361 static int compare(const void *va, const void *vb) {
362         assert(va && vb);
363
364         const struct utcp_connection *a = *(struct utcp_connection **)va;
365         const struct utcp_connection *b = *(struct utcp_connection **)vb;
366
367         assert(a && b);
368         assert(a->src && b->src);
369
370         int c = (int)a->src - (int)b->src;
371
372         if(c) {
373                 return c;
374         }
375
376         c = (int)a->dst - (int)b->dst;
377         return c;
378 }
379
380 static struct utcp_connection *find_connection(const struct utcp *utcp, uint16_t src, uint16_t dst) {
381         if(!utcp->nconnections) {
382                 return NULL;
383         }
384
385         struct utcp_connection key = {
386                 .src = src,
387                 .dst = dst,
388         }, *keyp = &key;
389         struct utcp_connection **match = bsearch(&keyp, utcp->connections, utcp->nconnections, sizeof(*utcp->connections), compare);
390         return match ? *match : NULL;
391 }
392
393 static void free_connection(struct utcp_connection *c) {
394         struct utcp *utcp = c->utcp;
395         struct utcp_connection **cp = bsearch(&c, utcp->connections, utcp->nconnections, sizeof(*utcp->connections), compare);
396
397         assert(cp);
398
399         int i = cp - utcp->connections;
400         memmove(cp, cp + 1, (utcp->nconnections - i - 1) * sizeof(*cp));
401         utcp->nconnections--;
402
403         buffer_exit(&c->rcvbuf);
404         buffer_exit(&c->sndbuf);
405         free(c);
406 }
407
408 static struct utcp_connection *allocate_connection(struct utcp *utcp, uint16_t src, uint16_t dst) {
409         // Check whether this combination of src and dst is free
410
411         if(src) {
412                 if(find_connection(utcp, src, dst)) {
413                         errno = EADDRINUSE;
414                         return NULL;
415                 }
416         } else { // If src == 0, generate a random port number with the high bit set
417                 if(utcp->nconnections >= 32767) {
418                         errno = ENOMEM;
419                         return NULL;
420                 }
421
422                 src = rand() | 0x8000;
423
424                 while(find_connection(utcp, src, dst)) {
425                         src++;
426                 }
427         }
428
429         // Allocate memory for the new connection
430
431         if(utcp->nconnections >= utcp->nallocated) {
432                 if(!utcp->nallocated) {
433                         utcp->nallocated = 4;
434                 } else {
435                         utcp->nallocated *= 2;
436                 }
437
438                 struct utcp_connection **new_array = realloc(utcp->connections, utcp->nallocated * sizeof(*utcp->connections));
439
440                 if(!new_array) {
441                         return NULL;
442                 }
443
444                 utcp->connections = new_array;
445         }
446
447         struct utcp_connection *c = calloc(1, sizeof(*c));
448
449         if(!c) {
450                 return NULL;
451         }
452
453         if(!buffer_set_size(&c->sndbuf, DEFAULT_SNDBUFSIZE, DEFAULT_MAXSNDBUFSIZE)) {
454                 free(c);
455                 return NULL;
456         }
457
458         if(!buffer_set_size(&c->rcvbuf, DEFAULT_RCVBUFSIZE, DEFAULT_MAXRCVBUFSIZE)) {
459                 buffer_exit(&c->sndbuf);
460                 free(c);
461                 return NULL;
462         }
463
464         // Fill in the details
465
466         c->src = src;
467         c->dst = dst;
468 #ifdef UTCP_DEBUG
469         c->snd.iss = 0;
470 #else
471         c->snd.iss = rand();
472 #endif
473         c->snd.una = c->snd.iss;
474         c->snd.nxt = c->snd.iss + 1;
475         c->snd.last = c->snd.nxt;
476         c->snd.cwnd = (utcp->mss > 2190 ? 2 : utcp->mss > 1095 ? 3 : 4) * utcp->mss;
477         c->snd.ssthresh = ~0;
478         debug_cwnd(c);
479         c->utcp = utcp;
480
481         // Add it to the sorted list of connections
482
483         utcp->connections[utcp->nconnections++] = c;
484         qsort(utcp->connections, utcp->nconnections, sizeof(*utcp->connections), compare);
485
486         return c;
487 }
488
489 static inline uint32_t absdiff(uint32_t a, uint32_t b) {
490         if(a > b) {
491                 return a - b;
492         } else {
493                 return b - a;
494         }
495 }
496
497 // Update RTT variables. See RFC 6298.
498 static void update_rtt(struct utcp_connection *c, uint32_t rtt) {
499         if(!rtt) {
500                 debug(c, "invalid rtt\n");
501                 return;
502         }
503
504         struct utcp *utcp = c->utcp;
505
506         if(!utcp->srtt) {
507                 utcp->srtt = rtt;
508                 utcp->rttvar = rtt / 2;
509         } else {
510                 utcp->rttvar = (utcp->rttvar * 3 + absdiff(utcp->srtt, rtt)) / 4;
511                 utcp->srtt = (utcp->srtt * 7 + rtt) / 8;
512         }
513
514         utcp->rto = utcp->srtt + max(4 * utcp->rttvar, CLOCK_GRANULARITY);
515
516         if(utcp->rto > MAX_RTO) {
517                 utcp->rto = MAX_RTO;
518         }
519
520         debug(c, "rtt %u srtt %u rttvar %u rto %u\n", rtt, utcp->srtt, utcp->rttvar, utcp->rto);
521 }
522
523 static void start_retransmit_timer(struct utcp_connection *c) {
524         clock_gettime(UTCP_CLOCK, &c->rtrx_timeout);
525
526         uint32_t rto = c->utcp->rto;
527
528         while(rto > USEC_PER_SEC) {
529                 c->rtrx_timeout.tv_sec++;
530                 rto -= USEC_PER_SEC;
531         }
532
533         c->rtrx_timeout.tv_nsec += c->utcp->rto * 1000;
534
535         if(c->rtrx_timeout.tv_nsec >= NSEC_PER_SEC) {
536                 c->rtrx_timeout.tv_nsec -= NSEC_PER_SEC;
537                 c->rtrx_timeout.tv_sec++;
538         }
539
540         debug(c, "rtrx_timeout %ld.%06lu\n", c->rtrx_timeout.tv_sec, c->rtrx_timeout.tv_nsec);
541 }
542
543 static void stop_retransmit_timer(struct utcp_connection *c) {
544         timespec_clear(&c->rtrx_timeout);
545         debug(c, "rtrx_timeout cleared\n");
546 }
547
548 struct utcp_connection *utcp_connect_ex(struct utcp *utcp, uint16_t dst, utcp_recv_t recv, void *priv, uint32_t flags) {
549         struct utcp_connection *c = allocate_connection(utcp, 0, dst);
550
551         if(!c) {
552                 return NULL;
553         }
554
555         assert((flags & ~0x1f) == 0);
556
557         c->flags = flags;
558         c->recv = recv;
559         c->priv = priv;
560
561         struct {
562                 struct hdr hdr;
563                 uint8_t init[4];
564         } pkt;
565
566         pkt.hdr.src = c->src;
567         pkt.hdr.dst = c->dst;
568         pkt.hdr.seq = c->snd.iss;
569         pkt.hdr.ack = 0;
570         pkt.hdr.wnd = c->rcvbuf.maxsize;
571         pkt.hdr.ctl = SYN;
572         pkt.hdr.aux = 0x0101;
573         pkt.init[0] = 1;
574         pkt.init[1] = 0;
575         pkt.init[2] = 0;
576         pkt.init[3] = flags & 0x7;
577
578         set_state(c, SYN_SENT);
579
580         print_packet(c, "send", &pkt, sizeof(pkt));
581         utcp->send(utcp, &pkt, sizeof(pkt));
582
583         clock_gettime(UTCP_CLOCK, &c->conn_timeout);
584         c->conn_timeout.tv_sec += utcp->timeout;
585
586         start_retransmit_timer(c);
587
588         return c;
589 }
590
591 struct utcp_connection *utcp_connect(struct utcp *utcp, uint16_t dst, utcp_recv_t recv, void *priv) {
592         return utcp_connect_ex(utcp, dst, recv, priv, UTCP_TCP);
593 }
594
595 void utcp_accept(struct utcp_connection *c, utcp_recv_t recv, void *priv) {
596         if(c->reapable || c->state != SYN_RECEIVED) {
597                 debug(c, "accept() called on invalid connection in state %s\n", c, strstate[c->state]);
598                 return;
599         }
600
601         debug(c, "accepted %p %p\n", c, recv, priv);
602         c->recv = recv;
603         c->priv = priv;
604         set_state(c, ESTABLISHED);
605 }
606
607 static void ack(struct utcp_connection *c, bool sendatleastone) {
608         int32_t left = seqdiff(c->snd.last, c->snd.nxt);
609         int32_t cwndleft = min(c->snd.cwnd, c->snd.wnd) - seqdiff(c->snd.nxt, c->snd.una);
610
611         assert(left >= 0);
612
613         if(cwndleft <= 0) {
614                 left = 0;
615         } else if(cwndleft < left) {
616                 left = cwndleft;
617
618                 if(!sendatleastone || cwndleft > c->utcp->mss) {
619                         left -= left % c->utcp->mss;
620                 }
621         }
622
623         debug(c, "cwndleft %d left %d\n", cwndleft, left);
624
625         if(!left && !sendatleastone) {
626                 return;
627         }
628
629         struct {
630                 struct hdr hdr;
631                 uint8_t data[];
632         } *pkt = c->utcp->pkt;
633
634         pkt->hdr.src = c->src;
635         pkt->hdr.dst = c->dst;
636         pkt->hdr.ack = c->rcv.nxt;
637         pkt->hdr.wnd = c->rcvbuf.maxsize;
638         pkt->hdr.ctl = ACK;
639         pkt->hdr.aux = 0;
640
641         do {
642                 uint32_t seglen = left > c->utcp->mss ? c->utcp->mss : left;
643                 pkt->hdr.seq = c->snd.nxt;
644
645                 buffer_copy(&c->sndbuf, pkt->data, seqdiff(c->snd.nxt, c->snd.una), seglen);
646
647                 c->snd.nxt += seglen;
648                 left -= seglen;
649
650                 if(seglen && fin_wanted(c, c->snd.nxt)) {
651                         seglen--;
652                         pkt->hdr.ctl |= FIN;
653                 }
654
655                 if(!c->rtt_start.tv_sec) {
656                         // Start RTT measurement
657                         clock_gettime(UTCP_CLOCK, &c->rtt_start);
658                         c->rtt_seq = pkt->hdr.seq + seglen;
659                         debug(c, "starting RTT measurement, expecting ack %u\n", c->rtt_seq);
660                 }
661
662                 print_packet(c, "send", pkt, sizeof(pkt->hdr) + seglen);
663                 c->utcp->send(c->utcp, pkt, sizeof(pkt->hdr) + seglen);
664         } while(left);
665 }
666
667 ssize_t utcp_send(struct utcp_connection *c, const void *data, size_t len) {
668         if(c->reapable) {
669                 debug(c, "send() called on closed connection\n");
670                 errno = EBADF;
671                 return -1;
672         }
673
674         switch(c->state) {
675         case CLOSED:
676         case LISTEN:
677                 debug(c, "send() called on unconnected connection\n");
678                 errno = ENOTCONN;
679                 return -1;
680
681         case SYN_SENT:
682         case SYN_RECEIVED:
683         case ESTABLISHED:
684         case CLOSE_WAIT:
685                 break;
686
687         case FIN_WAIT_1:
688         case FIN_WAIT_2:
689         case CLOSING:
690         case LAST_ACK:
691         case TIME_WAIT:
692                 debug(c, "send() called on closed connection\n");
693                 errno = EPIPE;
694                 return -1;
695         }
696
697         // Exit early if we have nothing to send.
698
699         if(!len) {
700                 return 0;
701         }
702
703         if(!data) {
704                 errno = EFAULT;
705                 return -1;
706         }
707
708         // Check if we need to be able to buffer all data
709
710         if(c->flags & UTCP_NO_PARTIAL) {
711                 if(len > buffer_free(&c->sndbuf)) {
712                         if(len > c->sndbuf.maxsize) {
713                                 errno = EMSGSIZE;
714                                 return -1;
715                         } else {
716                                 errno = EWOULDBLOCK;
717                                 return 0;
718                         }
719                 }
720         }
721
722         // Add data to send buffer.
723
724         if(is_reliable(c) || (c->state != SYN_SENT && c->state != SYN_RECEIVED)) {
725                 len = buffer_put(&c->sndbuf, data, len);
726         } else {
727                 return 0;
728         }
729
730         if(len <= 0) {
731                 if(is_reliable(c)) {
732                         errno = EWOULDBLOCK;
733                         return 0;
734                 } else {
735                         return len;
736                 }
737         }
738
739         c->snd.last += len;
740
741         // Don't send anything yet if the connection has not fully established yet
742
743         if(c->state == SYN_SENT || c->state == SYN_RECEIVED) {
744                 return len;
745         }
746
747         ack(c, false);
748
749         if(!is_reliable(c)) {
750                 c->snd.una = c->snd.nxt = c->snd.last;
751                 buffer_discard(&c->sndbuf, c->sndbuf.used);
752                 c->do_poll = true;
753         }
754
755         if(is_reliable(c) && !timespec_isset(&c->rtrx_timeout)) {
756                 start_retransmit_timer(c);
757         }
758
759         if(is_reliable(c) && !timespec_isset(&c->conn_timeout)) {
760                 clock_gettime(UTCP_CLOCK, &c->conn_timeout);
761                 c->conn_timeout.tv_sec += c->utcp->timeout;
762         }
763
764         return len;
765 }
766
767 static void swap_ports(struct hdr *hdr) {
768         uint16_t tmp = hdr->src;
769         hdr->src = hdr->dst;
770         hdr->dst = tmp;
771 }
772
773 static void fast_retransmit(struct utcp_connection *c) {
774         if(c->state == CLOSED || c->snd.last == c->snd.una) {
775                 debug(c, "fast_retransmit() called but nothing to retransmit!\n");
776                 return;
777         }
778
779         struct utcp *utcp = c->utcp;
780
781         struct {
782                 struct hdr hdr;
783                 uint8_t data[];
784         } *pkt;
785
786         pkt = malloc(c->utcp->mtu);
787
788         if(!pkt) {
789                 return;
790         }
791
792         pkt->hdr.src = c->src;
793         pkt->hdr.dst = c->dst;
794         pkt->hdr.wnd = c->rcvbuf.maxsize;
795         pkt->hdr.aux = 0;
796
797         switch(c->state) {
798         case ESTABLISHED:
799         case FIN_WAIT_1:
800         case CLOSE_WAIT:
801         case CLOSING:
802         case LAST_ACK:
803                 // Send unacked data again.
804                 pkt->hdr.seq = c->snd.una;
805                 pkt->hdr.ack = c->rcv.nxt;
806                 pkt->hdr.ctl = ACK;
807                 uint32_t len = min(seqdiff(c->snd.last, c->snd.una), utcp->mss);
808
809                 if(fin_wanted(c, c->snd.una + len)) {
810                         len--;
811                         pkt->hdr.ctl |= FIN;
812                 }
813
814                 buffer_copy(&c->sndbuf, pkt->data, 0, len);
815                 print_packet(c, "rtrx", pkt, sizeof(pkt->hdr) + len);
816                 utcp->send(utcp, pkt, sizeof(pkt->hdr) + len);
817                 break;
818
819         default:
820                 break;
821         }
822
823         free(pkt);
824 }
825
826 static void retransmit(struct utcp_connection *c) {
827         if(c->state == CLOSED || c->snd.last == c->snd.una) {
828                 debug(c, "retransmit() called but nothing to retransmit!\n");
829                 stop_retransmit_timer(c);
830                 return;
831         }
832
833         struct utcp *utcp = c->utcp;
834
835         struct {
836                 struct hdr hdr;
837                 uint8_t data[];
838         } *pkt = c->utcp->pkt;
839
840         pkt->hdr.src = c->src;
841         pkt->hdr.dst = c->dst;
842         pkt->hdr.wnd = c->rcvbuf.maxsize;
843         pkt->hdr.aux = 0;
844
845         switch(c->state) {
846         case SYN_SENT:
847                 // Send our SYN again
848                 pkt->hdr.seq = c->snd.iss;
849                 pkt->hdr.ack = 0;
850                 pkt->hdr.ctl = SYN;
851                 pkt->hdr.aux = 0x0101;
852                 pkt->data[0] = 1;
853                 pkt->data[1] = 0;
854                 pkt->data[2] = 0;
855                 pkt->data[3] = c->flags & 0x7;
856                 print_packet(c, "rtrx", pkt, sizeof(pkt->hdr) + 4);
857                 utcp->send(utcp, pkt, sizeof(pkt->hdr) + 4);
858                 break;
859
860         case SYN_RECEIVED:
861                 // Send SYNACK again
862                 pkt->hdr.seq = c->snd.nxt;
863                 pkt->hdr.ack = c->rcv.nxt;
864                 pkt->hdr.ctl = SYN | ACK;
865                 print_packet(c, "rtrx", pkt, sizeof(pkt->hdr));
866                 utcp->send(utcp, pkt, sizeof(pkt->hdr));
867                 break;
868
869         case ESTABLISHED:
870         case FIN_WAIT_1:
871         case CLOSE_WAIT:
872         case CLOSING:
873         case LAST_ACK:
874                 // Send unacked data again.
875                 pkt->hdr.seq = c->snd.una;
876                 pkt->hdr.ack = c->rcv.nxt;
877                 pkt->hdr.ctl = ACK;
878                 uint32_t len = min(seqdiff(c->snd.last, c->snd.una), utcp->mss);
879
880                 if(fin_wanted(c, c->snd.una + len)) {
881                         len--;
882                         pkt->hdr.ctl |= FIN;
883                 }
884
885                 // RFC 5681 slow start after timeout
886                 uint32_t flightsize = seqdiff(c->snd.nxt, c->snd.una);
887                 c->snd.ssthresh = max(flightsize / 2, utcp->mss * 2); // eq. 4
888                 c->snd.cwnd = utcp->mss;
889                 debug_cwnd(c);
890
891                 buffer_copy(&c->sndbuf, pkt->data, 0, len);
892                 print_packet(c, "rtrx", pkt, sizeof(pkt->hdr) + len);
893                 utcp->send(utcp, pkt, sizeof(pkt->hdr) + len);
894
895                 c->snd.nxt = c->snd.una + len;
896                 break;
897
898         case CLOSED:
899         case LISTEN:
900         case TIME_WAIT:
901         case FIN_WAIT_2:
902                 // We shouldn't need to retransmit anything in this state.
903 #ifdef UTCP_DEBUG
904                 abort();
905 #endif
906                 stop_retransmit_timer(c);
907                 goto cleanup;
908         }
909
910         start_retransmit_timer(c);
911         utcp->rto *= 2;
912
913         if(utcp->rto > MAX_RTO) {
914                 utcp->rto = MAX_RTO;
915         }
916
917         c->rtt_start.tv_sec = 0; // invalidate RTT timer
918         c->dupack = 0; // cancel any ongoing fast recovery
919
920 cleanup:
921         return;
922 }
923
924 /* Update receive buffer and SACK entries after consuming data.
925  *
926  * Situation:
927  *
928  * |.....0000..1111111111.....22222......3333|
929  * |---------------^
930  *
931  * 0..3 represent the SACK entries. The ^ indicates up to which point we want
932  * to remove data from the receive buffer. The idea is to substract "len"
933  * from the offset of all the SACK entries, and then remove/cut down entries
934  * that are shifted to before the start of the receive buffer.
935  *
936  * There are three cases:
937  * - the SACK entry is after ^, in that case just change the offset.
938  * - the SACK entry starts before and ends after ^, so we have to
939  *   change both its offset and size.
940  * - the SACK entry is completely before ^, in that case delete it.
941  */
942 static void sack_consume(struct utcp_connection *c, size_t len) {
943         debug(c, "sack_consume %lu\n", (unsigned long)len);
944
945         if(len > c->rcvbuf.used) {
946                 debug(c, "all SACK entries consumed\n");
947                 c->sacks[0].len = 0;
948                 return;
949         }
950
951         buffer_discard(&c->rcvbuf, len);
952
953         for(int i = 0; i < NSACKS && c->sacks[i].len;) {
954                 if(len < c->sacks[i].offset) {
955                         c->sacks[i].offset -= len;
956                         i++;
957                 } else if(len < c->sacks[i].offset + c->sacks[i].len) {
958                         c->sacks[i].len -= len - c->sacks[i].offset;
959                         c->sacks[i].offset = 0;
960                         i++;
961                 } else {
962                         if(i < NSACKS - 1) {
963                                 memmove(&c->sacks[i], &c->sacks[i + 1], (NSACKS - 1 - i) * sizeof(c->sacks)[i]);
964                                 c->sacks[NSACKS - 1].len = 0;
965                         } else {
966                                 c->sacks[i].len = 0;
967                                 break;
968                         }
969                 }
970         }
971
972         for(int i = 0; i < NSACKS && c->sacks[i].len; i++) {
973                 debug(c, "SACK[%d] offset %u len %u\n", i, c->sacks[i].offset, c->sacks[i].len);
974         }
975 }
976
977 static void handle_out_of_order(struct utcp_connection *c, uint32_t offset, const void *data, size_t len) {
978         debug(c, "out of order packet, offset %u\n", offset);
979         // Packet loss or reordering occured. Store the data in the buffer.
980         ssize_t rxd = buffer_put_at(&c->rcvbuf, offset, data, len);
981
982         if(rxd < 0 || (size_t)rxd < len) {
983                 abort();
984         }
985
986         // Make note of where we put it.
987         for(int i = 0; i < NSACKS; i++) {
988                 if(!c->sacks[i].len) { // nothing to merge, add new entry
989                         debug(c, "new SACK entry %d\n", i);
990                         c->sacks[i].offset = offset;
991                         c->sacks[i].len = rxd;
992                         break;
993                 } else if(offset < c->sacks[i].offset) {
994                         if(offset + rxd < c->sacks[i].offset) { // insert before
995                                 if(!c->sacks[NSACKS - 1].len) { // only if room left
996                                         debug(c, "insert SACK entry at %d\n", i);
997                                         memmove(&c->sacks[i + 1], &c->sacks[i], (NSACKS - i - 1) * sizeof(c->sacks)[i]);
998                                         c->sacks[i].offset = offset;
999                                         c->sacks[i].len = rxd;
1000                                 } else {
1001                                         debug(c, "SACK entries full, dropping packet\n");
1002                                 }
1003
1004                                 break;
1005                         } else { // merge
1006                                 debug(c, "merge with start of SACK entry at %d\n", i);
1007                                 c->sacks[i].offset = offset;
1008                                 break;
1009                         }
1010                 } else if(offset <= c->sacks[i].offset + c->sacks[i].len) {
1011                         if(offset + rxd > c->sacks[i].offset + c->sacks[i].len) { // merge
1012                                 debug(c, "merge with end of SACK entry at %d\n", i);
1013                                 c->sacks[i].len = offset + rxd - c->sacks[i].offset;
1014                                 // TODO: handle potential merge with next entry
1015                         }
1016
1017                         break;
1018                 }
1019         }
1020
1021         for(int i = 0; i < NSACKS && c->sacks[i].len; i++) {
1022                 debug(c, "SACK[%d] offset %u len %u\n", i, c->sacks[i].offset, c->sacks[i].len);
1023         }
1024 }
1025
1026 static void handle_in_order(struct utcp_connection *c, const void *data, size_t len) {
1027         // Check if we can process out-of-order data now.
1028         if(c->sacks[0].len && len >= c->sacks[0].offset) { // TODO: handle overlap with second SACK
1029                 debug(c, "incoming packet len %lu connected with SACK at %u\n", (unsigned long)len, c->sacks[0].offset);
1030                 buffer_put_at(&c->rcvbuf, 0, data, len); // TODO: handle return value
1031                 len = max(len, c->sacks[0].offset + c->sacks[0].len);
1032                 data = c->rcvbuf.data;
1033         }
1034
1035         if(c->recv) {
1036                 ssize_t rxd = c->recv(c, data, len);
1037
1038                 if(rxd < 0 || (size_t)rxd != len) {
1039                         // TODO: handle the application not accepting all data.
1040                         abort();
1041                 }
1042         }
1043
1044         if(c->rcvbuf.used) {
1045                 sack_consume(c, len);
1046         }
1047
1048         c->rcv.nxt += len;
1049 }
1050
1051
1052 static void handle_incoming_data(struct utcp_connection *c, uint32_t seq, const void *data, size_t len) {
1053         if(!is_reliable(c)) {
1054                 c->recv(c, data, len);
1055                 c->rcv.nxt = seq + len;
1056                 return;
1057         }
1058
1059         uint32_t offset = seqdiff(seq, c->rcv.nxt);
1060
1061         if(offset + len > c->rcvbuf.maxsize) {
1062                 abort();
1063         }
1064
1065         if(offset) {
1066                 handle_out_of_order(c, offset, data, len);
1067         } else {
1068                 handle_in_order(c, data, len);
1069         }
1070 }
1071
1072
1073 ssize_t utcp_recv(struct utcp *utcp, const void *data, size_t len) {
1074         const uint8_t *ptr = data;
1075
1076         if(!utcp) {
1077                 errno = EFAULT;
1078                 return -1;
1079         }
1080
1081         if(!len) {
1082                 return 0;
1083         }
1084
1085         if(!data) {
1086                 errno = EFAULT;
1087                 return -1;
1088         }
1089
1090         // Drop packets smaller than the header
1091
1092         struct hdr hdr;
1093
1094         if(len < sizeof(hdr)) {
1095                 print_packet(NULL, "recv", data, len);
1096                 errno = EBADMSG;
1097                 return -1;
1098         }
1099
1100         // Make a copy from the potentially unaligned data to a struct hdr
1101
1102         memcpy(&hdr, ptr, sizeof(hdr));
1103
1104         // Try to match the packet to an existing connection
1105
1106         struct utcp_connection *c = find_connection(utcp, hdr.dst, hdr.src);
1107         print_packet(c, "recv", data, len);
1108
1109         // Process the header
1110
1111         ptr += sizeof(hdr);
1112         len -= sizeof(hdr);
1113
1114         // Drop packets with an unknown CTL flag
1115
1116         if(hdr.ctl & ~(SYN | ACK | RST | FIN)) {
1117                 print_packet(NULL, "recv", data, len);
1118                 errno = EBADMSG;
1119                 return -1;
1120         }
1121
1122         // Check for auxiliary headers
1123
1124         const uint8_t *init = NULL;
1125
1126         uint16_t aux = hdr.aux;
1127
1128         while(aux) {
1129                 size_t auxlen = 4 * (aux >> 8) & 0xf;
1130                 uint8_t auxtype = aux & 0xff;
1131
1132                 if(len < auxlen) {
1133                         errno = EBADMSG;
1134                         return -1;
1135                 }
1136
1137                 switch(auxtype) {
1138                 case AUX_INIT:
1139                         if(!(hdr.ctl & SYN) || auxlen != 4) {
1140                                 errno = EBADMSG;
1141                                 return -1;
1142                         }
1143
1144                         init = ptr;
1145                         break;
1146
1147                 default:
1148                         errno = EBADMSG;
1149                         return -1;
1150                 }
1151
1152                 len -= auxlen;
1153                 ptr += auxlen;
1154
1155                 if(!(aux & 0x800)) {
1156                         break;
1157                 }
1158
1159                 if(len < 2) {
1160                         errno = EBADMSG;
1161                         return -1;
1162                 }
1163
1164                 memcpy(&aux, ptr, 2);
1165                 len -= 2;
1166                 ptr += 2;
1167         }
1168
1169         bool has_data = len || (hdr.ctl & (SYN | FIN));
1170
1171         // Is it for a new connection?
1172
1173         if(!c) {
1174                 // Ignore RST packets
1175
1176                 if(hdr.ctl & RST) {
1177                         return 0;
1178                 }
1179
1180                 // Is it a SYN packet and are we LISTENing?
1181
1182                 if(hdr.ctl & SYN && !(hdr.ctl & ACK) && utcp->accept) {
1183                         // If we don't want to accept it, send a RST back
1184                         if((utcp->pre_accept && !utcp->pre_accept(utcp, hdr.dst))) {
1185                                 len = 1;
1186                                 goto reset;
1187                         }
1188
1189                         // Try to allocate memory, otherwise send a RST back
1190                         c = allocate_connection(utcp, hdr.dst, hdr.src);
1191
1192                         if(!c) {
1193                                 len = 1;
1194                                 goto reset;
1195                         }
1196
1197                         // Parse auxilliary information
1198                         if(init) {
1199                                 if(init[0] < 1) {
1200                                         len = 1;
1201                                         goto reset;
1202                                 }
1203
1204                                 c->flags = init[3] & 0x7;
1205                         } else {
1206                                 c->flags = UTCP_TCP;
1207                         }
1208
1209 synack:
1210                         // Return SYN+ACK, go to SYN_RECEIVED state
1211                         c->snd.wnd = hdr.wnd;
1212                         c->rcv.irs = hdr.seq;
1213                         c->rcv.nxt = c->rcv.irs + 1;
1214                         set_state(c, SYN_RECEIVED);
1215
1216                         struct {
1217                                 struct hdr hdr;
1218                                 uint8_t data[4];
1219                         } pkt;
1220
1221                         pkt.hdr.src = c->src;
1222                         pkt.hdr.dst = c->dst;
1223                         pkt.hdr.ack = c->rcv.irs + 1;
1224                         pkt.hdr.seq = c->snd.iss;
1225                         pkt.hdr.wnd = c->rcvbuf.maxsize;
1226                         pkt.hdr.ctl = SYN | ACK;
1227
1228                         if(init) {
1229                                 pkt.hdr.aux = 0x0101;
1230                                 pkt.data[0] = 1;
1231                                 pkt.data[1] = 0;
1232                                 pkt.data[2] = 0;
1233                                 pkt.data[3] = c->flags & 0x7;
1234                                 print_packet(c, "send", &pkt, sizeof(hdr) + 4);
1235                                 utcp->send(utcp, &pkt, sizeof(hdr) + 4);
1236                         } else {
1237                                 pkt.hdr.aux = 0;
1238                                 print_packet(c, "send", &pkt, sizeof(hdr));
1239                                 utcp->send(utcp, &pkt, sizeof(hdr));
1240                         }
1241                 } else {
1242                         // No, we don't want your packets, send a RST back
1243                         len = 1;
1244                         goto reset;
1245                 }
1246
1247                 return 0;
1248         }
1249
1250         debug(c, "state %s\n", strstate[c->state]);
1251
1252         // In case this is for a CLOSED connection, ignore the packet.
1253         // TODO: make it so incoming packets can never match a CLOSED connection.
1254
1255         if(c->state == CLOSED) {
1256                 debug(c, "got packet for closed connection\n");
1257                 return 0;
1258         }
1259
1260         // It is for an existing connection.
1261
1262         // 1. Drop invalid packets.
1263
1264         // 1a. Drop packets that should not happen in our current state.
1265
1266         switch(c->state) {
1267         case SYN_SENT:
1268         case SYN_RECEIVED:
1269         case ESTABLISHED:
1270         case FIN_WAIT_1:
1271         case FIN_WAIT_2:
1272         case CLOSE_WAIT:
1273         case CLOSING:
1274         case LAST_ACK:
1275         case TIME_WAIT:
1276                 break;
1277
1278         default:
1279 #ifdef UTCP_DEBUG
1280                 abort();
1281 #endif
1282                 break;
1283         }
1284
1285         // 1b. Discard data that is not in our receive window.
1286
1287         if(is_reliable(c)) {
1288                 bool acceptable;
1289
1290                 if(c->state == SYN_SENT) {
1291                         acceptable = true;
1292                 } else if(len == 0) {
1293                         acceptable = seqdiff(hdr.seq, c->rcv.nxt) >= 0;
1294                 } else {
1295                         int32_t rcv_offset = seqdiff(hdr.seq, c->rcv.nxt);
1296
1297                         // cut already accepted front overlapping
1298                         if(rcv_offset < 0) {
1299                                 acceptable = len > (size_t) - rcv_offset;
1300
1301                                 if(acceptable) {
1302                                         ptr -= rcv_offset;
1303                                         len += rcv_offset;
1304                                         hdr.seq -= rcv_offset;
1305                                 }
1306                         } else {
1307                                 acceptable = seqdiff(hdr.seq, c->rcv.nxt) >= 0 && seqdiff(hdr.seq, c->rcv.nxt) + len <= c->rcvbuf.maxsize;
1308                         }
1309                 }
1310
1311                 if(!acceptable) {
1312                         debug(c, "packet not acceptable, %u <= %u + %lu < %u\n", c->rcv.nxt, hdr.seq, (unsigned long)len, c->rcv.nxt + c->rcvbuf.maxsize);
1313
1314                         // Ignore unacceptable RST packets.
1315                         if(hdr.ctl & RST) {
1316                                 return 0;
1317                         }
1318
1319                         // Otherwise, continue processing.
1320                         len = 0;
1321                 }
1322         } else {
1323 #if UTCP_DEBUG
1324                 int32_t rcv_offset = seqdiff(hdr.seq, c->rcv.nxt);
1325
1326                 if(rcv_offset) {
1327                         debug(c, "packet out of order, offset %u bytes", rcv_offset);
1328                 }
1329
1330                 if(rcv_offset >= 0) {
1331                         c->rcv.nxt = hdr.seq + len;
1332                 }
1333
1334 #endif
1335         }
1336
1337         c->snd.wnd = hdr.wnd; // TODO: move below
1338
1339         // 1c. Drop packets with an invalid ACK.
1340         // ackno should not roll back, and it should also not be bigger than what we ever could have sent
1341         // (= snd.una + c->sndbuf.used).
1342
1343         if(!is_reliable(c)) {
1344                 if(hdr.ack != c->snd.last && c->state >= ESTABLISHED) {
1345                         hdr.ack = c->snd.una;
1346                 }
1347         }
1348
1349         if(hdr.ctl & ACK && (seqdiff(hdr.ack, c->snd.last) > 0 || seqdiff(hdr.ack, c->snd.una) < 0)) {
1350                 debug(c, "packet ack seqno out of range, %u <= %u < %u\n", c->snd.una, hdr.ack, c->snd.una + c->sndbuf.used);
1351
1352                 // Ignore unacceptable RST packets.
1353                 if(hdr.ctl & RST) {
1354                         return 0;
1355                 }
1356
1357                 goto reset;
1358         }
1359
1360         // 2. Handle RST packets
1361
1362         if(hdr.ctl & RST) {
1363                 switch(c->state) {
1364                 case SYN_SENT:
1365                         if(!(hdr.ctl & ACK)) {
1366                                 return 0;
1367                         }
1368
1369                         // The peer has refused our connection.
1370                         set_state(c, CLOSED);
1371                         errno = ECONNREFUSED;
1372
1373                         if(c->recv) {
1374                                 c->recv(c, NULL, 0);
1375                         }
1376
1377                         if(c->poll && !c->reapable) {
1378                                 c->poll(c, 0);
1379                         }
1380
1381                         return 0;
1382
1383                 case SYN_RECEIVED:
1384                         if(hdr.ctl & ACK) {
1385                                 return 0;
1386                         }
1387
1388                         // We haven't told the application about this connection yet. Silently delete.
1389                         free_connection(c);
1390                         return 0;
1391
1392                 case ESTABLISHED:
1393                 case FIN_WAIT_1:
1394                 case FIN_WAIT_2:
1395                 case CLOSE_WAIT:
1396                         if(hdr.ctl & ACK) {
1397                                 return 0;
1398                         }
1399
1400                         // The peer has aborted our connection.
1401                         set_state(c, CLOSED);
1402                         errno = ECONNRESET;
1403
1404                         if(c->recv) {
1405                                 c->recv(c, NULL, 0);
1406                         }
1407
1408                         if(c->poll && !c->reapable) {
1409                                 c->poll(c, 0);
1410                         }
1411
1412                         return 0;
1413
1414                 case CLOSING:
1415                 case LAST_ACK:
1416                 case TIME_WAIT:
1417                         if(hdr.ctl & ACK) {
1418                                 return 0;
1419                         }
1420
1421                         // As far as the application is concerned, the connection has already been closed.
1422                         // If it has called utcp_close() already, we can immediately free this connection.
1423                         if(c->reapable) {
1424                                 free_connection(c);
1425                                 return 0;
1426                         }
1427
1428                         // Otherwise, immediately move to the CLOSED state.
1429                         set_state(c, CLOSED);
1430                         return 0;
1431
1432                 default:
1433 #ifdef UTCP_DEBUG
1434                         abort();
1435 #endif
1436                         break;
1437                 }
1438         }
1439
1440         uint32_t advanced;
1441
1442         if(!(hdr.ctl & ACK)) {
1443                 advanced = 0;
1444                 goto skip_ack;
1445         }
1446
1447         // 3. Advance snd.una
1448
1449         advanced = seqdiff(hdr.ack, c->snd.una);
1450
1451         if(advanced) {
1452                 // RTT measurement
1453                 if(c->rtt_start.tv_sec) {
1454                         if(c->rtt_seq == hdr.ack) {
1455                                 struct timespec now;
1456                                 clock_gettime(UTCP_CLOCK, &now);
1457                                 int32_t diff = timespec_diff_usec(&now, &c->rtt_start);
1458                                 update_rtt(c, diff);
1459                                 c->rtt_start.tv_sec = 0;
1460                         } else if(c->rtt_seq < hdr.ack) {
1461                                 debug(c, "cancelling RTT measurement: %u < %u\n", c->rtt_seq, hdr.ack);
1462                                 c->rtt_start.tv_sec = 0;
1463                         }
1464                 }
1465
1466                 int32_t data_acked = advanced;
1467
1468                 switch(c->state) {
1469                 case SYN_SENT:
1470                 case SYN_RECEIVED:
1471                         data_acked--;
1472                         break;
1473
1474                 // TODO: handle FIN as well.
1475                 default:
1476                         break;
1477                 }
1478
1479                 assert(data_acked >= 0);
1480
1481 #ifndef NDEBUG
1482                 int32_t bufused = seqdiff(c->snd.last, c->snd.una);
1483                 assert(data_acked <= bufused);
1484 #endif
1485
1486                 if(data_acked) {
1487                         buffer_discard(&c->sndbuf, data_acked);
1488                         c->do_poll = true;
1489                 }
1490
1491                 // Also advance snd.nxt if possible
1492                 if(seqdiff(c->snd.nxt, hdr.ack) < 0) {
1493                         c->snd.nxt = hdr.ack;
1494                 }
1495
1496                 c->snd.una = hdr.ack;
1497
1498                 if(c->dupack) {
1499                         if(c->dupack >= 3) {
1500                                 debug(c, "fast recovery ended\n");
1501                                 c->snd.cwnd = c->snd.ssthresh;
1502                         }
1503
1504                         c->dupack = 0;
1505                 }
1506
1507                 // Increase the congestion window according to RFC 5681
1508                 if(c->snd.cwnd < c->snd.ssthresh) {
1509                         c->snd.cwnd += min(advanced, utcp->mss); // eq. 2
1510                 } else {
1511                         c->snd.cwnd += max(1, (utcp->mss * utcp->mss) / c->snd.cwnd); // eq. 3
1512                 }
1513
1514                 if(c->snd.cwnd > c->sndbuf.maxsize) {
1515                         c->snd.cwnd = c->sndbuf.maxsize;
1516                 }
1517
1518                 debug_cwnd(c);
1519
1520                 // Check if we have sent a FIN that is now ACKed.
1521                 switch(c->state) {
1522                 case FIN_WAIT_1:
1523                         if(c->snd.una == c->snd.last) {
1524                                 set_state(c, FIN_WAIT_2);
1525                         }
1526
1527                         break;
1528
1529                 case CLOSING:
1530                         if(c->snd.una == c->snd.last) {
1531                                 clock_gettime(UTCP_CLOCK, &c->conn_timeout);
1532                                 c->conn_timeout.tv_sec += utcp->timeout;
1533                                 set_state(c, TIME_WAIT);
1534                         }
1535
1536                         break;
1537
1538                 default:
1539                         break;
1540                 }
1541         } else {
1542                 if(!len && is_reliable(c) && c->snd.una != c->snd.last) {
1543                         c->dupack++;
1544                         debug(c, "duplicate ACK %d\n", c->dupack);
1545
1546                         if(c->dupack == 3) {
1547                                 // RFC 5681 fast recovery
1548                                 debug(c, "fast recovery started\n", c->dupack);
1549                                 uint32_t flightsize = seqdiff(c->snd.nxt, c->snd.una);
1550                                 c->snd.ssthresh = max(flightsize / 2, utcp->mss * 2); // eq. 4
1551                                 c->snd.cwnd = min(c->snd.ssthresh + 3 * utcp->mss, c->sndbuf.maxsize);
1552
1553                                 if(c->snd.cwnd > c->sndbuf.maxsize) {
1554                                         c->snd.cwnd = c->sndbuf.maxsize;
1555                                 }
1556
1557                                 debug_cwnd(c);
1558
1559                                 fast_retransmit(c);
1560                         } else if(c->dupack > 3) {
1561                                 c->snd.cwnd += utcp->mss;
1562
1563                                 if(c->snd.cwnd > c->sndbuf.maxsize) {
1564                                         c->snd.cwnd = c->sndbuf.maxsize;
1565                                 }
1566
1567                                 debug_cwnd(c);
1568                         }
1569
1570                         // We got an ACK which indicates the other side did get one of our packets.
1571                         // Reset the retransmission timer to avoid going to slow start,
1572                         // but don't touch the connection timeout.
1573                         start_retransmit_timer(c);
1574                 }
1575         }
1576
1577         // 4. Update timers
1578
1579         if(advanced) {
1580                 if(c->snd.una == c->snd.last) {
1581                         stop_retransmit_timer(c);
1582                         timespec_clear(&c->conn_timeout);
1583                 } else if(is_reliable(c)) {
1584                         start_retransmit_timer(c);
1585                         clock_gettime(UTCP_CLOCK, &c->conn_timeout);
1586                         c->conn_timeout.tv_sec += utcp->timeout;
1587                 }
1588         }
1589
1590 skip_ack:
1591         // 5. Process SYN stuff
1592
1593         if(hdr.ctl & SYN) {
1594                 switch(c->state) {
1595                 case SYN_SENT:
1596
1597                         // This is a SYNACK. It should always have ACKed the SYN.
1598                         if(!advanced) {
1599                                 goto reset;
1600                         }
1601
1602                         c->rcv.irs = hdr.seq;
1603                         c->rcv.nxt = hdr.seq;
1604
1605                         if(c->shut_wr) {
1606                                 c->snd.last++;
1607                                 set_state(c, FIN_WAIT_1);
1608                         } else {
1609                                 set_state(c, ESTABLISHED);
1610                         }
1611
1612                         // TODO: notify application of this somehow.
1613                         break;
1614
1615                 case SYN_RECEIVED:
1616                         // This is a retransmit of a SYN, send back the SYNACK.
1617                         goto synack;
1618
1619                 case ESTABLISHED:
1620                 case FIN_WAIT_1:
1621                 case FIN_WAIT_2:
1622                 case CLOSE_WAIT:
1623                 case CLOSING:
1624                 case LAST_ACK:
1625                 case TIME_WAIT:
1626                         // Ehm, no. We should never receive a second SYN.
1627                         return 0;
1628
1629                 default:
1630 #ifdef UTCP_DEBUG
1631                         abort();
1632 #endif
1633                         return 0;
1634                 }
1635
1636                 // SYN counts as one sequence number
1637                 c->rcv.nxt++;
1638         }
1639
1640         // 6. Process new data
1641
1642         if(c->state == SYN_RECEIVED) {
1643                 // This is the ACK after the SYNACK. It should always have ACKed the SYNACK.
1644                 if(!advanced) {
1645                         goto reset;
1646                 }
1647
1648                 // Are we still LISTENing?
1649                 if(utcp->accept) {
1650                         utcp->accept(c, c->src);
1651                 }
1652
1653                 if(c->state != ESTABLISHED) {
1654                         set_state(c, CLOSED);
1655                         c->reapable = true;
1656                         goto reset;
1657                 }
1658         }
1659
1660         if(len) {
1661                 switch(c->state) {
1662                 case SYN_SENT:
1663                 case SYN_RECEIVED:
1664                         // This should never happen.
1665 #ifdef UTCP_DEBUG
1666                         abort();
1667 #endif
1668                         return 0;
1669
1670                 case ESTABLISHED:
1671                 case FIN_WAIT_1:
1672                 case FIN_WAIT_2:
1673                         break;
1674
1675                 case CLOSE_WAIT:
1676                 case CLOSING:
1677                 case LAST_ACK:
1678                 case TIME_WAIT:
1679                         // Ehm no, We should never receive more data after a FIN.
1680                         goto reset;
1681
1682                 default:
1683 #ifdef UTCP_DEBUG
1684                         abort();
1685 #endif
1686                         return 0;
1687                 }
1688
1689                 handle_incoming_data(c, hdr.seq, ptr, len);
1690         }
1691
1692         // 7. Process FIN stuff
1693
1694         if((hdr.ctl & FIN) && (!is_reliable(c) || hdr.seq + len == c->rcv.nxt)) {
1695                 switch(c->state) {
1696                 case SYN_SENT:
1697                 case SYN_RECEIVED:
1698                         // This should never happen.
1699 #ifdef UTCP_DEBUG
1700                         abort();
1701 #endif
1702                         break;
1703
1704                 case ESTABLISHED:
1705                         set_state(c, CLOSE_WAIT);
1706                         break;
1707
1708                 case FIN_WAIT_1:
1709                         set_state(c, CLOSING);
1710                         break;
1711
1712                 case FIN_WAIT_2:
1713                         clock_gettime(UTCP_CLOCK, &c->conn_timeout);
1714                         c->conn_timeout.tv_sec += utcp->timeout;
1715                         set_state(c, TIME_WAIT);
1716                         break;
1717
1718                 case CLOSE_WAIT:
1719                 case CLOSING:
1720                 case LAST_ACK:
1721                 case TIME_WAIT:
1722                         // Ehm, no. We should never receive a second FIN.
1723                         goto reset;
1724
1725                 default:
1726 #ifdef UTCP_DEBUG
1727                         abort();
1728 #endif
1729                         break;
1730                 }
1731
1732                 // FIN counts as one sequence number
1733                 c->rcv.nxt++;
1734                 len++;
1735
1736                 // Inform the application that the peer closed its end of the connection.
1737                 if(c->recv) {
1738                         errno = 0;
1739                         c->recv(c, NULL, 0);
1740                 }
1741         }
1742
1743         // Now we send something back if:
1744         // - we received data, so we have to send back an ACK
1745         //   -> sendatleastone = true
1746         // - or we got an ack, so we should maybe send a bit more data
1747         //   -> sendatleastone = false
1748
1749         if(is_reliable(c) || hdr.ctl & SYN || hdr.ctl & FIN) {
1750                 ack(c, has_data);
1751         }
1752
1753         return 0;
1754
1755 reset:
1756         swap_ports(&hdr);
1757         hdr.wnd = 0;
1758         hdr.aux = 0;
1759
1760         if(hdr.ctl & ACK) {
1761                 hdr.seq = hdr.ack;
1762                 hdr.ctl = RST;
1763         } else {
1764                 hdr.ack = hdr.seq + len;
1765                 hdr.seq = 0;
1766                 hdr.ctl = RST | ACK;
1767         }
1768
1769         print_packet(c, "send", &hdr, sizeof(hdr));
1770         utcp->send(utcp, &hdr, sizeof(hdr));
1771         return 0;
1772
1773 }
1774
1775 int utcp_shutdown(struct utcp_connection *c, int dir) {
1776         debug(c, "shutdown %d at %u\n", dir, c ? c->snd.last : 0);
1777
1778         if(!c) {
1779                 errno = EFAULT;
1780                 return -1;
1781         }
1782
1783         if(c->reapable) {
1784                 debug(c, "shutdown() called on closed connection\n");
1785                 errno = EBADF;
1786                 return -1;
1787         }
1788
1789         if(!(dir == UTCP_SHUT_RD || dir == UTCP_SHUT_WR || dir == UTCP_SHUT_RDWR)) {
1790                 errno = EINVAL;
1791                 return -1;
1792         }
1793
1794         // TCP does not have a provision for stopping incoming packets.
1795         // The best we can do is to just ignore them.
1796         if(dir == UTCP_SHUT_RD || dir == UTCP_SHUT_RDWR) {
1797                 c->recv = NULL;
1798         }
1799
1800         // The rest of the code deals with shutting down writes.
1801         if(dir == UTCP_SHUT_RD) {
1802                 return 0;
1803         }
1804
1805         // Only process shutting down writes once.
1806         if(c->shut_wr) {
1807                 return 0;
1808         }
1809
1810         c->shut_wr = true;
1811
1812         switch(c->state) {
1813         case CLOSED:
1814         case LISTEN:
1815                 errno = ENOTCONN;
1816                 return -1;
1817
1818         case SYN_SENT:
1819                 return 0;
1820
1821         case SYN_RECEIVED:
1822         case ESTABLISHED:
1823                 set_state(c, FIN_WAIT_1);
1824                 break;
1825
1826         case FIN_WAIT_1:
1827         case FIN_WAIT_2:
1828                 return 0;
1829
1830         case CLOSE_WAIT:
1831                 set_state(c, CLOSING);
1832                 break;
1833
1834         case CLOSING:
1835         case LAST_ACK:
1836         case TIME_WAIT:
1837                 return 0;
1838         }
1839
1840         c->snd.last++;
1841
1842         ack(c, false);
1843
1844         if(!timespec_isset(&c->rtrx_timeout)) {
1845                 start_retransmit_timer(c);
1846         }
1847
1848         return 0;
1849 }
1850
1851 static bool reset_connection(struct utcp_connection *c) {
1852         if(!c) {
1853                 errno = EFAULT;
1854                 return false;
1855         }
1856
1857         if(c->reapable) {
1858                 debug(c, "abort() called on closed connection\n");
1859                 errno = EBADF;
1860                 return false;
1861         }
1862
1863         c->recv = NULL;
1864         c->poll = NULL;
1865
1866         switch(c->state) {
1867         case CLOSED:
1868                 return true;
1869
1870         case LISTEN:
1871         case SYN_SENT:
1872         case CLOSING:
1873         case LAST_ACK:
1874         case TIME_WAIT:
1875                 set_state(c, CLOSED);
1876                 return true;
1877
1878         case SYN_RECEIVED:
1879         case ESTABLISHED:
1880         case FIN_WAIT_1:
1881         case FIN_WAIT_2:
1882         case CLOSE_WAIT:
1883                 set_state(c, CLOSED);
1884                 break;
1885         }
1886
1887         // Send RST
1888
1889         struct hdr hdr;
1890
1891         hdr.src = c->src;
1892         hdr.dst = c->dst;
1893         hdr.seq = c->snd.nxt;
1894         hdr.ack = 0;
1895         hdr.wnd = 0;
1896         hdr.ctl = RST;
1897
1898         print_packet(c, "send", &hdr, sizeof(hdr));
1899         c->utcp->send(c->utcp, &hdr, sizeof(hdr));
1900         return true;
1901 }
1902
1903 // Closes all the opened connections
1904 void utcp_abort_all_connections(struct utcp *utcp) {
1905         if(!utcp) {
1906                 errno = EINVAL;
1907                 return;
1908         }
1909
1910         for(int i = 0; i < utcp->nconnections; i++) {
1911                 struct utcp_connection *c = utcp->connections[i];
1912
1913                 if(c->reapable || c->state == CLOSED) {
1914                         continue;
1915                 }
1916
1917                 utcp_recv_t old_recv = c->recv;
1918                 utcp_poll_t old_poll = c->poll;
1919
1920                 reset_connection(c);
1921
1922                 if(old_recv) {
1923                         errno = 0;
1924                         old_recv(c, NULL, 0);
1925                 }
1926
1927                 if(old_poll && !c->reapable) {
1928                         errno = 0;
1929                         old_poll(c, 0);
1930                 }
1931         }
1932
1933         return;
1934 }
1935
1936 int utcp_close(struct utcp_connection *c) {
1937         if(utcp_shutdown(c, SHUT_RDWR) && errno != ENOTCONN) {
1938                 return -1;
1939         }
1940
1941         c->recv = NULL;
1942         c->poll = NULL;
1943         c->reapable = true;
1944         return 0;
1945 }
1946
1947 int utcp_abort(struct utcp_connection *c) {
1948         if(!reset_connection(c)) {
1949                 return -1;
1950         }
1951
1952         c->reapable = true;
1953         return 0;
1954 }
1955
1956 /* Handle timeouts.
1957  * One call to this function will loop through all connections,
1958  * checking if something needs to be resent or not.
1959  * The return value is the time to the next timeout in milliseconds,
1960  * or maybe a negative value if the timeout is infinite.
1961  */
1962 struct timespec utcp_timeout(struct utcp *utcp) {
1963         struct timespec now;
1964         clock_gettime(UTCP_CLOCK, &now);
1965         struct timespec next = {now.tv_sec + 3600, now.tv_nsec};
1966
1967         for(int i = 0; i < utcp->nconnections; i++) {
1968                 struct utcp_connection *c = utcp->connections[i];
1969
1970                 if(!c) {
1971                         continue;
1972                 }
1973
1974                 // delete connections that have been utcp_close()d.
1975                 if(c->state == CLOSED) {
1976                         if(c->reapable) {
1977                                 debug(c, "reaping\n");
1978                                 free_connection(c);
1979                                 i--;
1980                         }
1981
1982                         continue;
1983                 }
1984
1985                 if(timespec_isset(&c->conn_timeout) && timespec_lt(&c->conn_timeout, &now)) {
1986                         errno = ETIMEDOUT;
1987                         c->state = CLOSED;
1988
1989                         if(c->recv) {
1990                                 c->recv(c, NULL, 0);
1991                         }
1992
1993                         if(c->poll && !c->reapable) {
1994                                 c->poll(c, 0);
1995                         }
1996
1997                         continue;
1998                 }
1999
2000                 if(timespec_isset(&c->rtrx_timeout) && timespec_lt(&c->rtrx_timeout, &now)) {
2001                         debug(c, "retransmitting after timeout\n");
2002                         retransmit(c);
2003                 }
2004
2005                 if(c->poll) {
2006                         if((c->state == ESTABLISHED || c->state == CLOSE_WAIT) && c->do_poll) {
2007                                 c->do_poll = false;
2008                                 uint32_t len = buffer_free(&c->sndbuf);
2009
2010                                 if(len) {
2011                                         c->poll(c, len);
2012                                 }
2013                         } else if(c->state == CLOSED) {
2014                                 c->poll(c, 0);
2015                         }
2016                 }
2017
2018                 if(timespec_isset(&c->conn_timeout) && timespec_lt(&c->conn_timeout, &next)) {
2019                         next = c->conn_timeout;
2020                 }
2021
2022                 if(timespec_isset(&c->rtrx_timeout) && timespec_lt(&c->rtrx_timeout, &next)) {
2023                         next = c->rtrx_timeout;
2024                 }
2025         }
2026
2027         struct timespec diff;
2028
2029         timespec_sub(&next, &now, &diff);
2030
2031         return diff;
2032 }
2033
2034 bool utcp_is_active(struct utcp *utcp) {
2035         if(!utcp) {
2036                 return false;
2037         }
2038
2039         for(int i = 0; i < utcp->nconnections; i++)
2040                 if(utcp->connections[i]->state != CLOSED && utcp->connections[i]->state != TIME_WAIT) {
2041                         return true;
2042                 }
2043
2044         return false;
2045 }
2046
2047 struct utcp *utcp_init(utcp_accept_t accept, utcp_pre_accept_t pre_accept, utcp_send_t send, void *priv) {
2048         if(!send) {
2049                 errno = EFAULT;
2050                 return NULL;
2051         }
2052
2053         struct utcp *utcp = calloc(1, sizeof(*utcp));
2054
2055         if(!utcp) {
2056                 return NULL;
2057         }
2058
2059         if(!CLOCK_GRANULARITY) {
2060                 struct timespec res;
2061                 clock_getres(UTCP_CLOCK, &res);
2062                 CLOCK_GRANULARITY = res.tv_sec * USEC_PER_SEC + res.tv_nsec / 1000;
2063         }
2064
2065         utcp->accept = accept;
2066         utcp->pre_accept = pre_accept;
2067         utcp->send = send;
2068         utcp->priv = priv;
2069         utcp_set_mtu(utcp, DEFAULT_MTU);
2070         utcp->timeout = DEFAULT_USER_TIMEOUT; // sec
2071         utcp->rto = START_RTO; // usec
2072
2073         return utcp;
2074 }
2075
2076 void utcp_exit(struct utcp *utcp) {
2077         if(!utcp) {
2078                 return;
2079         }
2080
2081         for(int i = 0; i < utcp->nconnections; i++) {
2082                 struct utcp_connection *c = utcp->connections[i];
2083
2084                 if(!c->reapable) {
2085                         if(c->recv) {
2086                                 c->recv(c, NULL, 0);
2087                         }
2088
2089                         if(c->poll && !c->reapable) {
2090                                 c->poll(c, 0);
2091                         }
2092                 }
2093
2094                 buffer_exit(&c->rcvbuf);
2095                 buffer_exit(&c->sndbuf);
2096                 free(c);
2097         }
2098
2099         free(utcp->connections);
2100         free(utcp);
2101 }
2102
2103 uint16_t utcp_get_mtu(struct utcp *utcp) {
2104         return utcp ? utcp->mtu : 0;
2105 }
2106
2107 uint16_t utcp_get_mss(struct utcp *utcp) {
2108         return utcp ? utcp->mss : 0;
2109 }
2110
2111 void utcp_set_mtu(struct utcp *utcp, uint16_t mtu) {
2112         if(!utcp) {
2113                 return;
2114         }
2115
2116         if(mtu <= sizeof(struct hdr)) {
2117                 return;
2118         }
2119
2120         if(mtu > utcp->mtu) {
2121                 char *new = realloc(utcp->pkt, mtu + sizeof(struct hdr));
2122
2123                 if(!new) {
2124                         return;
2125                 }
2126
2127                 utcp->pkt = new;
2128         }
2129
2130         utcp->mtu = mtu;
2131         utcp->mss = mtu - sizeof(struct hdr);
2132 }
2133
2134 void utcp_reset_timers(struct utcp *utcp) {
2135         if(!utcp) {
2136                 return;
2137         }
2138
2139         struct timespec now, then;
2140
2141         clock_gettime(UTCP_CLOCK, &now);
2142
2143         then = now;
2144
2145         then.tv_sec += utcp->timeout;
2146
2147         for(int i = 0; i < utcp->nconnections; i++) {
2148                 struct utcp_connection *c = utcp->connections[i];
2149
2150                 if(c->reapable) {
2151                         continue;
2152                 }
2153
2154                 if(timespec_isset(&c->rtrx_timeout)) {
2155                         c->rtrx_timeout = now;
2156                 }
2157
2158                 if(timespec_isset(&c->conn_timeout)) {
2159                         c->conn_timeout = then;
2160                 }
2161
2162                 c->rtt_start.tv_sec = 0;
2163         }
2164
2165         if(utcp->rto > START_RTO) {
2166                 utcp->rto = START_RTO;
2167         }
2168 }
2169
2170 int utcp_get_user_timeout(struct utcp *u) {
2171         return u ? u->timeout : 0;
2172 }
2173
2174 void utcp_set_user_timeout(struct utcp *u, int timeout) {
2175         if(u) {
2176                 u->timeout = timeout;
2177         }
2178 }
2179
2180 size_t utcp_get_sndbuf(struct utcp_connection *c) {
2181         return c ? c->sndbuf.maxsize : 0;
2182 }
2183
2184 size_t utcp_get_sndbuf_free(struct utcp_connection *c) {
2185         if(!c) {
2186                 return 0;
2187         }
2188
2189         switch(c->state) {
2190         case SYN_SENT:
2191         case SYN_RECEIVED:
2192         case ESTABLISHED:
2193         case CLOSE_WAIT:
2194                 return buffer_free(&c->sndbuf);
2195
2196         default:
2197                 return 0;
2198         }
2199 }
2200
2201 void utcp_set_sndbuf(struct utcp_connection *c, size_t size) {
2202         if(!c) {
2203                 return;
2204         }
2205
2206         c->sndbuf.maxsize = size;
2207
2208         if(c->sndbuf.maxsize != size) {
2209                 c->sndbuf.maxsize = -1;
2210         }
2211
2212         c->do_poll = buffer_free(&c->sndbuf);
2213 }
2214
2215 size_t utcp_get_rcvbuf(struct utcp_connection *c) {
2216         return c ? c->rcvbuf.maxsize : 0;
2217 }
2218
2219 size_t utcp_get_rcvbuf_free(struct utcp_connection *c) {
2220         if(c && (c->state == ESTABLISHED || c->state == CLOSE_WAIT)) {
2221                 return buffer_free(&c->rcvbuf);
2222         } else {
2223                 return 0;
2224         }
2225 }
2226
2227 void utcp_set_rcvbuf(struct utcp_connection *c, size_t size) {
2228         if(!c) {
2229                 return;
2230         }
2231
2232         c->rcvbuf.maxsize = size;
2233
2234         if(c->rcvbuf.maxsize != size) {
2235                 c->rcvbuf.maxsize = -1;
2236         }
2237 }
2238
2239 size_t utcp_get_sendq(struct utcp_connection *c) {
2240         return c->sndbuf.used;
2241 }
2242
2243 size_t utcp_get_recvq(struct utcp_connection *c) {
2244         return c->rcvbuf.used;
2245 }
2246
2247 bool utcp_get_nodelay(struct utcp_connection *c) {
2248         return c ? c->nodelay : false;
2249 }
2250
2251 void utcp_set_nodelay(struct utcp_connection *c, bool nodelay) {
2252         if(c) {
2253                 c->nodelay = nodelay;
2254         }
2255 }
2256
2257 bool utcp_get_keepalive(struct utcp_connection *c) {
2258         return c ? c->keepalive : false;
2259 }
2260
2261 void utcp_set_keepalive(struct utcp_connection *c, bool keepalive) {
2262         if(c) {
2263                 c->keepalive = keepalive;
2264         }
2265 }
2266
2267 size_t utcp_get_outq(struct utcp_connection *c) {
2268         return c ? seqdiff(c->snd.nxt, c->snd.una) : 0;
2269 }
2270
2271 void utcp_set_recv_cb(struct utcp_connection *c, utcp_recv_t recv) {
2272         if(c) {
2273                 c->recv = recv;
2274         }
2275 }
2276
2277 void utcp_set_poll_cb(struct utcp_connection *c, utcp_poll_t poll) {
2278         if(c) {
2279                 c->poll = poll;
2280                 c->do_poll = buffer_free(&c->sndbuf);
2281         }
2282 }
2283
2284 void utcp_set_accept_cb(struct utcp *utcp, utcp_accept_t accept, utcp_pre_accept_t pre_accept) {
2285         if(utcp) {
2286                 utcp->accept = accept;
2287                 utcp->pre_accept = pre_accept;
2288         }
2289 }
2290
2291 void utcp_expect_data(struct utcp_connection *c, bool expect) {
2292         if(!c || c->reapable) {
2293                 return;
2294         }
2295
2296         if(!(c->state == ESTABLISHED || c->state == FIN_WAIT_1 || c->state == FIN_WAIT_2)) {
2297                 return;
2298         }
2299
2300         if(expect) {
2301                 // If we expect data, start the connection timer.
2302                 if(!timespec_isset(&c->conn_timeout)) {
2303                         clock_gettime(UTCP_CLOCK, &c->conn_timeout);
2304                         c->conn_timeout.tv_sec += c->utcp->timeout;
2305                 }
2306         } else {
2307                 // If we want to cancel expecting data, only clear the timer when there is no unACKed data.
2308                 if(c->snd.una == c->snd.last) {
2309                         timespec_clear(&c->conn_timeout);
2310                 }
2311         }
2312 }
2313
2314 void utcp_offline(struct utcp *utcp, bool offline) {
2315         struct timespec now;
2316         clock_gettime(UTCP_CLOCK, &now);
2317
2318         for(int i = 0; i < utcp->nconnections; i++) {
2319                 struct utcp_connection *c = utcp->connections[i];
2320
2321                 if(c->reapable) {
2322                         continue;
2323                 }
2324
2325                 utcp_expect_data(c, offline);
2326
2327                 if(!offline) {
2328                         if(timespec_isset(&c->rtrx_timeout)) {
2329                                 c->rtrx_timeout = now;
2330                         }
2331
2332                         utcp->connections[i]->rtt_start.tv_sec = 0;
2333                 }
2334         }
2335
2336         if(!offline && utcp->rto > START_RTO) {
2337                 utcp->rto = START_RTO;
2338         }
2339 }
2340
2341 void utcp_set_clock_granularity(long granularity) {
2342         CLOCK_GRANULARITY = granularity;
2343 }