]> git.meshlink.io Git - utcp/blob - utcp.c
Fix a memory leak.
[utcp] / utcp.c
1 /*
2     utcp.c -- Userspace TCP
3     Copyright (C) 2014-2017 Guus Sliepen <guus@tinc-vpn.org>
4
5     This program is free software; you can redistribute it and/or modify
6     it under the terms of the GNU General Public License as published by
7     the Free Software Foundation; either version 2 of the License, or
8     (at your option) any later version.
9
10     This program is distributed in the hope that it will be useful,
11     but WITHOUT ANY WARRANTY; without even the implied warranty of
12     MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
13     GNU General Public License for more details.
14
15     You should have received a copy of the GNU General Public License along
16     with this program; if not, write to the Free Software Foundation, Inc.,
17     51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
18 */
19
20 #define _GNU_SOURCE
21
22 #include <assert.h>
23 #include <errno.h>
24 #include <stdio.h>
25 #include <stdlib.h>
26 #include <stdint.h>
27 #include <stdbool.h>
28 #include <string.h>
29 #include <unistd.h>
30 #include <time.h>
31
32 #include "utcp_priv.h"
33
34 #ifndef EBADMSG
35 #define EBADMSG         104
36 #endif
37
38 #ifndef SHUT_RDWR
39 #define SHUT_RDWR 2
40 #endif
41
42 #ifdef poll
43 #undef poll
44 #endif
45
46 #ifndef UTCP_CLOCK
47 #if defined(CLOCK_MONOTONIC_RAW) && defined(__x86_64__)
48 #define UTCP_CLOCK CLOCK_MONOTONIC_RAW
49 #else
50 #define UTCP_CLOCK CLOCK_MONOTONIC
51 #endif
52 #endif
53
54 static void timespec_sub(const struct timespec *a, const struct timespec *b, struct timespec *r) {
55         r->tv_sec = a->tv_sec - b->tv_sec;
56         r->tv_nsec = a->tv_nsec - b->tv_nsec;
57
58         if(r->tv_nsec < 0) {
59                 r->tv_sec--, r->tv_nsec += NSEC_PER_SEC;
60         }
61 }
62
63 static int32_t timespec_diff_usec(const struct timespec *a, const struct timespec *b) {
64         int64_t diff = (a->tv_sec - b->tv_sec) * 1000000000 + a->tv_sec - b->tv_sec;
65         return diff / 1000;
66 }
67
68 static bool timespec_lt(const struct timespec *a, const struct timespec *b) {
69         if(a->tv_sec == b->tv_sec) {
70                 return a->tv_nsec < b->tv_nsec;
71         } else {
72                 return a->tv_sec < b->tv_sec;
73         }
74 }
75
76 static void timespec_clear(struct timespec *a) {
77         a->tv_sec = 0;
78 }
79
80 static bool timespec_isset(const struct timespec *a) {
81         return a->tv_sec;
82 }
83
84 static long CLOCK_GRANULARITY; // usec
85
86 static inline size_t min(size_t a, size_t b) {
87         return a < b ? a : b;
88 }
89
90 static inline size_t max(size_t a, size_t b) {
91         return a > b ? a : b;
92 }
93
94 #ifdef UTCP_DEBUG
95 #include <stdarg.h>
96
97 #ifndef UTCP_DEBUG_DATALEN
98 #define UTCP_DEBUG_DATALEN 20
99 #endif
100
101 static void debug(struct utcp_connection *c, const char *format, ...) {
102         struct timespec tv;
103         char buf[1024];
104         int len;
105
106         clock_gettime(CLOCK_REALTIME, &tv);
107         len = snprintf(buf, sizeof(buf), "%ld.%06lu %u:%u ", (long)tv.tv_sec, tv.tv_nsec / 1000, c ? c->src : 0, c ? c->dst : 0);
108         va_list ap;
109         va_start(ap, format);
110         len += vsnprintf(buf + len, sizeof(buf) - len, format, ap);
111         va_end(ap);
112
113         if(len > 0 && (size_t)len < sizeof(buf)) {
114                 fwrite(buf, len, 1, stderr);
115         }
116 }
117
118 static void print_packet(struct utcp_connection *c, const char *dir, const void *pkt, size_t len) {
119         struct hdr hdr;
120
121         if(len < sizeof(hdr)) {
122                 debug(c, "%s: short packet (%lu bytes)\n", dir, (unsigned long)len);
123                 return;
124         }
125
126         memcpy(&hdr, pkt, sizeof(hdr));
127
128         uint32_t datalen;
129
130         if(len > sizeof(hdr)) {
131                 datalen = min(len - sizeof(hdr), UTCP_DEBUG_DATALEN);
132         } else {
133                 datalen = 0;
134         }
135
136
137         const uint8_t *data = (uint8_t *)pkt + sizeof(hdr);
138         char str[datalen * 2 + 1];
139         char *p = str;
140
141         for(uint32_t i = 0; i < datalen; i++) {
142                 *p++ = "0123456789ABCDEF"[data[i] >> 4];
143                 *p++ = "0123456789ABCDEF"[data[i] & 15];
144         }
145
146         *p = 0;
147
148         debug(c, "%s: len %lu src %u dst %u seq %u ack %u wnd %u aux %x ctl %s%s%s%s data %s\n",
149               dir, (unsigned long)len, hdr.src, hdr.dst, hdr.seq, hdr.ack, hdr.wnd, hdr.aux,
150               hdr.ctl & SYN ? "SYN" : "",
151               hdr.ctl & RST ? "RST" : "",
152               hdr.ctl & FIN ? "FIN" : "",
153               hdr.ctl & ACK ? "ACK" : "",
154               str
155              );
156 }
157
158 static void debug_cwnd(struct utcp_connection *c) {
159         debug(c, "snd.cwnd %u snd.ssthresh %u\n", c->snd.cwnd, ~c->snd.ssthresh ? c->snd.ssthresh : 0);
160 }
161 #else
162 #define debug(...) do {} while(0)
163 #define print_packet(...) do {} while(0)
164 #define debug_cwnd(...) do {} while(0)
165 #endif
166
167 static void set_state(struct utcp_connection *c, enum state state) {
168         c->state = state;
169
170         if(state == ESTABLISHED) {
171                 timespec_clear(&c->conn_timeout);
172         }
173
174         debug(c, "state %s\n", strstate[state]);
175 }
176
177 static bool fin_wanted(struct utcp_connection *c, uint32_t seq) {
178         if(seq != c->snd.last) {
179                 return false;
180         }
181
182         switch(c->state) {
183         case FIN_WAIT_1:
184         case CLOSING:
185         case LAST_ACK:
186                 return true;
187
188         default:
189                 return false;
190         }
191 }
192
193 static bool is_reliable(struct utcp_connection *c) {
194         return c->flags & UTCP_RELIABLE;
195 }
196
197 static int32_t seqdiff(uint32_t a, uint32_t b) {
198         return a - b;
199 }
200
201 // Buffer functions
202 static bool buffer_wraps(struct buffer *buf) {
203         return buf->size - buf->offset < buf->used;
204 }
205
206 static bool buffer_resize(struct buffer *buf, uint32_t newsize) {
207         char *newdata = realloc(buf->data, newsize);
208
209         if(!newdata) {
210                 return false;
211         }
212
213         buf->data = newdata;
214
215         if(buffer_wraps(buf)) {
216                 // Shift the right part of the buffer until it hits the end of the new buffer.
217                 // Old situation:
218                 // [345......012]
219                 // New situation:
220                 // [345.........|........012]
221                 uint32_t tailsize = buf->size - buf->offset;
222                 uint32_t newoffset = newsize - tailsize;
223                 memmove(buf->data + newoffset, buf->data + buf->offset, tailsize);
224                 buf->offset = newoffset;
225         }
226
227         buf->size = newsize;
228         return true;
229 }
230
231 // Store data into the buffer
232 static ssize_t buffer_put_at(struct buffer *buf, size_t offset, const void *data, size_t len) {
233         debug(NULL, "buffer_put_at %lu %lu %lu\n", (unsigned long)buf->used, (unsigned long)offset, (unsigned long)len);
234
235         // Ensure we don't store more than maxsize bytes in total
236         size_t required = offset + len;
237
238         if(required > buf->maxsize) {
239                 if(offset >= buf->maxsize) {
240                         return 0;
241                 }
242
243                 len = buf->maxsize - offset;
244                 required = buf->maxsize;
245         }
246
247         // Check if we need to resize the buffer
248         if(required > buf->size) {
249                 size_t newsize = buf->size;
250
251                 if(!newsize) {
252                         newsize = 4096;
253                 }
254
255                 do {
256                         newsize *= 2;
257                 } while(newsize < required);
258
259                 if(newsize > buf->maxsize) {
260                         newsize = buf->maxsize;
261                 }
262
263                 if(!buffer_resize(buf, newsize)) {
264                         return -1;
265                 }
266         }
267
268         uint32_t realoffset = buf->offset + offset;
269
270         if(buf->size - buf->offset < offset) {
271                 // The offset wrapped
272                 realoffset -= buf->size;
273         }
274
275         if(buf->size - realoffset < len) {
276                 // The new chunk of data must be wrapped
277                 memcpy(buf->data + realoffset, data, buf->size - realoffset);
278                 memcpy(buf->data, (char *)data + buf->size - realoffset, len - (buf->size - realoffset));
279         } else {
280                 memcpy(buf->data + realoffset, data, len);
281         }
282
283         if(required > buf->used) {
284                 buf->used = required;
285         }
286
287         return len;
288 }
289
290 static ssize_t buffer_put(struct buffer *buf, const void *data, size_t len) {
291         return buffer_put_at(buf, buf->used, data, len);
292 }
293
294 // Copy data from the buffer without removing it.
295 static ssize_t buffer_copy(struct buffer *buf, void *data, size_t offset, size_t len) {
296         // Ensure we don't copy more than is actually stored in the buffer
297         if(offset >= buf->used) {
298                 return 0;
299         }
300
301         if(buf->used - offset < len) {
302                 len = buf->used - offset;
303         }
304
305         uint32_t realoffset = buf->offset + offset;
306
307         if(buf->size - buf->offset < offset) {
308                 // The offset wrapped
309                 realoffset -= buf->size;
310         }
311
312         if(buf->size - realoffset < len) {
313                 // The data is wrapped
314                 memcpy(data, buf->data + realoffset, buf->size - realoffset);
315                 memcpy((char *)data + buf->size - realoffset, buf->data, len - (buf->size - realoffset));
316         } else {
317                 memcpy(data, buf->data + realoffset, len);
318         }
319
320         return len;
321 }
322
323 // Copy data from the buffer without removing it.
324 static ssize_t buffer_call(struct buffer *buf, utcp_recv_t cb, void *arg, size_t offset, size_t len) {
325         // Ensure we don't copy more than is actually stored in the buffer
326         if(offset >= buf->used) {
327                 return 0;
328         }
329
330         if(buf->used - offset < len) {
331                 len = buf->used - offset;
332         }
333
334         uint32_t realoffset = buf->offset + offset;
335
336         if(buf->size - buf->offset < offset) {
337                 // The offset wrapped
338                 realoffset -= buf->size;
339         }
340
341         if(buf->size - realoffset < len) {
342                 // The data is wrapped
343                 ssize_t rx1 = cb(arg, buf->data + realoffset, buf->size - realoffset);
344
345                 if(rx1 < buf->size - realoffset) {
346                         return rx1;
347                 }
348
349                 ssize_t rx2 = cb(arg, buf->data, len - (buf->size - realoffset));
350
351                 if(rx2 < 0) {
352                         return rx2;
353                 } else {
354                         return rx1 + rx2;
355                 }
356         } else {
357                 return cb(arg, buf->data + realoffset, len);
358         }
359 }
360
361 // Discard data from the buffer.
362 static ssize_t buffer_discard(struct buffer *buf, size_t len) {
363         if(buf->used < len) {
364                 len = buf->used;
365         }
366
367         if(buf->size - buf->offset < len) {
368                 buf->offset -= buf->size;
369         }
370
371         buf->offset += len;
372         buf->used -= len;
373
374         return len;
375 }
376
377 static bool buffer_set_size(struct buffer *buf, uint32_t minsize, uint32_t maxsize) {
378         if(maxsize < minsize) {
379                 maxsize = minsize;
380         }
381
382         buf->maxsize = maxsize;
383
384         return buf->size >= minsize || buffer_resize(buf, minsize);
385 }
386
387 static void buffer_exit(struct buffer *buf) {
388         free(buf->data);
389         memset(buf, 0, sizeof(*buf));
390 }
391
392 static uint32_t buffer_free(const struct buffer *buf) {
393         return buf->maxsize - buf->used;
394 }
395
396 // Connections are stored in a sorted list.
397 // This gives O(log(N)) lookup time, O(N log(N)) insertion time and O(N) deletion time.
398
399 static int compare(const void *va, const void *vb) {
400         assert(va && vb);
401
402         const struct utcp_connection *a = *(struct utcp_connection **)va;
403         const struct utcp_connection *b = *(struct utcp_connection **)vb;
404
405         assert(a && b);
406         assert(a->src && b->src);
407
408         int c = (int)a->src - (int)b->src;
409
410         if(c) {
411                 return c;
412         }
413
414         c = (int)a->dst - (int)b->dst;
415         return c;
416 }
417
418 static struct utcp_connection *find_connection(const struct utcp *utcp, uint16_t src, uint16_t dst) {
419         if(!utcp->nconnections) {
420                 return NULL;
421         }
422
423         struct utcp_connection key = {
424                 .src = src,
425                 .dst = dst,
426         }, *keyp = &key;
427         struct utcp_connection **match = bsearch(&keyp, utcp->connections, utcp->nconnections, sizeof(*utcp->connections), compare);
428         return match ? *match : NULL;
429 }
430
431 static void free_connection(struct utcp_connection *c) {
432         struct utcp *utcp = c->utcp;
433         struct utcp_connection **cp = bsearch(&c, utcp->connections, utcp->nconnections, sizeof(*utcp->connections), compare);
434
435         assert(cp);
436
437         int i = cp - utcp->connections;
438         memmove(cp, cp + 1, (utcp->nconnections - i - 1) * sizeof(*cp));
439         utcp->nconnections--;
440
441         buffer_exit(&c->rcvbuf);
442         buffer_exit(&c->sndbuf);
443         free(c);
444 }
445
446 static struct utcp_connection *allocate_connection(struct utcp *utcp, uint16_t src, uint16_t dst) {
447         // Check whether this combination of src and dst is free
448
449         if(src) {
450                 if(find_connection(utcp, src, dst)) {
451                         errno = EADDRINUSE;
452                         return NULL;
453                 }
454         } else { // If src == 0, generate a random port number with the high bit set
455                 if(utcp->nconnections >= 32767) {
456                         errno = ENOMEM;
457                         return NULL;
458                 }
459
460                 src = rand() | 0x8000;
461
462                 while(find_connection(utcp, src, dst)) {
463                         src++;
464                 }
465         }
466
467         // Allocate memory for the new connection
468
469         if(utcp->nconnections >= utcp->nallocated) {
470                 if(!utcp->nallocated) {
471                         utcp->nallocated = 4;
472                 } else {
473                         utcp->nallocated *= 2;
474                 }
475
476                 struct utcp_connection **new_array = realloc(utcp->connections, utcp->nallocated * sizeof(*utcp->connections));
477
478                 if(!new_array) {
479                         return NULL;
480                 }
481
482                 utcp->connections = new_array;
483         }
484
485         struct utcp_connection *c = calloc(1, sizeof(*c));
486
487         if(!c) {
488                 return NULL;
489         }
490
491         if(!buffer_set_size(&c->sndbuf, DEFAULT_SNDBUFSIZE, DEFAULT_MAXSNDBUFSIZE)) {
492                 free(c);
493                 return NULL;
494         }
495
496         if(!buffer_set_size(&c->rcvbuf, DEFAULT_RCVBUFSIZE, DEFAULT_MAXRCVBUFSIZE)) {
497                 buffer_exit(&c->sndbuf);
498                 free(c);
499                 return NULL;
500         }
501
502         // Fill in the details
503
504         c->src = src;
505         c->dst = dst;
506 #ifdef UTCP_DEBUG
507         c->snd.iss = 0;
508 #else
509         c->snd.iss = rand();
510 #endif
511         c->snd.una = c->snd.iss;
512         c->snd.nxt = c->snd.iss + 1;
513         c->snd.last = c->snd.nxt;
514         c->snd.cwnd = (utcp->mss > 2190 ? 2 : utcp->mss > 1095 ? 3 : 4) * utcp->mss;
515         c->snd.ssthresh = ~0;
516         debug_cwnd(c);
517         c->utcp = utcp;
518
519         // Add it to the sorted list of connections
520
521         utcp->connections[utcp->nconnections++] = c;
522         qsort(utcp->connections, utcp->nconnections, sizeof(*utcp->connections), compare);
523
524         return c;
525 }
526
527 static inline uint32_t absdiff(uint32_t a, uint32_t b) {
528         if(a > b) {
529                 return a - b;
530         } else {
531                 return b - a;
532         }
533 }
534
535 // Update RTT variables. See RFC 6298.
536 static void update_rtt(struct utcp_connection *c, uint32_t rtt) {
537         if(!rtt) {
538                 debug(c, "invalid rtt\n");
539                 return;
540         }
541
542         struct utcp *utcp = c->utcp;
543
544         if(!utcp->srtt) {
545                 utcp->srtt = rtt;
546                 utcp->rttvar = rtt / 2;
547         } else {
548                 utcp->rttvar = (utcp->rttvar * 3 + absdiff(utcp->srtt, rtt)) / 4;
549                 utcp->srtt = (utcp->srtt * 7 + rtt) / 8;
550         }
551
552         utcp->rto = utcp->srtt + max(4 * utcp->rttvar, CLOCK_GRANULARITY);
553
554         if(utcp->rto > MAX_RTO) {
555                 utcp->rto = MAX_RTO;
556         }
557
558         debug(c, "rtt %u srtt %u rttvar %u rto %u\n", rtt, utcp->srtt, utcp->rttvar, utcp->rto);
559 }
560
561 static void start_retransmit_timer(struct utcp_connection *c) {
562         clock_gettime(UTCP_CLOCK, &c->rtrx_timeout);
563
564         uint32_t rto = c->utcp->rto;
565
566         while(rto > USEC_PER_SEC) {
567                 c->rtrx_timeout.tv_sec++;
568                 rto -= USEC_PER_SEC;
569         }
570
571         c->rtrx_timeout.tv_nsec += c->utcp->rto * 1000;
572
573         if(c->rtrx_timeout.tv_nsec >= NSEC_PER_SEC) {
574                 c->rtrx_timeout.tv_nsec -= NSEC_PER_SEC;
575                 c->rtrx_timeout.tv_sec++;
576         }
577
578         debug(c, "rtrx_timeout %ld.%06lu\n", c->rtrx_timeout.tv_sec, c->rtrx_timeout.tv_nsec);
579 }
580
581 static void stop_retransmit_timer(struct utcp_connection *c) {
582         timespec_clear(&c->rtrx_timeout);
583         debug(c, "rtrx_timeout cleared\n");
584 }
585
586 struct utcp_connection *utcp_connect_ex(struct utcp *utcp, uint16_t dst, utcp_recv_t recv, void *priv, uint32_t flags) {
587         struct utcp_connection *c = allocate_connection(utcp, 0, dst);
588
589         if(!c) {
590                 return NULL;
591         }
592
593         assert((flags & ~0x1f) == 0);
594
595         c->flags = flags;
596         c->recv = recv;
597         c->priv = priv;
598
599         struct {
600                 struct hdr hdr;
601                 uint8_t init[4];
602         } pkt;
603
604         pkt.hdr.src = c->src;
605         pkt.hdr.dst = c->dst;
606         pkt.hdr.seq = c->snd.iss;
607         pkt.hdr.ack = 0;
608         pkt.hdr.wnd = c->rcvbuf.maxsize;
609         pkt.hdr.ctl = SYN;
610         pkt.hdr.aux = 0x0101;
611         pkt.init[0] = 1;
612         pkt.init[1] = 0;
613         pkt.init[2] = 0;
614         pkt.init[3] = flags & 0x7;
615
616         set_state(c, SYN_SENT);
617
618         print_packet(c, "send", &pkt, sizeof(pkt));
619         utcp->send(utcp, &pkt, sizeof(pkt));
620
621         clock_gettime(UTCP_CLOCK, &c->conn_timeout);
622         c->conn_timeout.tv_sec += utcp->timeout;
623
624         start_retransmit_timer(c);
625
626         return c;
627 }
628
629 struct utcp_connection *utcp_connect(struct utcp *utcp, uint16_t dst, utcp_recv_t recv, void *priv) {
630         return utcp_connect_ex(utcp, dst, recv, priv, UTCP_TCP);
631 }
632
633 void utcp_accept(struct utcp_connection *c, utcp_recv_t recv, void *priv) {
634         if(c->reapable || c->state != SYN_RECEIVED) {
635                 debug(c, "accept() called on invalid connection in state %s\n", c, strstate[c->state]);
636                 return;
637         }
638
639         debug(c, "accepted %p %p\n", c, recv, priv);
640         c->recv = recv;
641         c->priv = priv;
642         set_state(c, ESTABLISHED);
643 }
644
645 static void ack(struct utcp_connection *c, bool sendatleastone) {
646         int32_t left = seqdiff(c->snd.last, c->snd.nxt);
647         int32_t cwndleft = min(c->snd.cwnd, c->snd.wnd) - seqdiff(c->snd.nxt, c->snd.una);
648
649         assert(left >= 0);
650
651         if(cwndleft <= 0) {
652                 left = 0;
653         } else if(cwndleft < left) {
654                 left = cwndleft;
655
656                 if(!sendatleastone || cwndleft > c->utcp->mss) {
657                         left -= left % c->utcp->mss;
658                 }
659         }
660
661         debug(c, "cwndleft %d left %d\n", cwndleft, left);
662
663         if(!left && !sendatleastone) {
664                 return;
665         }
666
667         struct {
668                 struct hdr hdr;
669                 uint8_t data[];
670         } *pkt = c->utcp->pkt;
671
672         pkt->hdr.src = c->src;
673         pkt->hdr.dst = c->dst;
674         pkt->hdr.ack = c->rcv.nxt;
675         pkt->hdr.wnd = c->rcvbuf.maxsize;
676         pkt->hdr.ctl = ACK;
677         pkt->hdr.aux = 0;
678
679         do {
680                 uint32_t seglen = left > c->utcp->mss ? c->utcp->mss : left;
681                 pkt->hdr.seq = c->snd.nxt;
682
683                 buffer_copy(&c->sndbuf, pkt->data, seqdiff(c->snd.nxt, c->snd.una), seglen);
684
685                 c->snd.nxt += seglen;
686                 left -= seglen;
687
688                 if(seglen && fin_wanted(c, c->snd.nxt)) {
689                         seglen--;
690                         pkt->hdr.ctl |= FIN;
691                 }
692
693                 if(!c->rtt_start.tv_sec) {
694                         // Start RTT measurement
695                         clock_gettime(UTCP_CLOCK, &c->rtt_start);
696                         c->rtt_seq = pkt->hdr.seq + seglen;
697                         debug(c, "starting RTT measurement, expecting ack %u\n", c->rtt_seq);
698                 }
699
700                 print_packet(c, "send", pkt, sizeof(pkt->hdr) + seglen);
701                 c->utcp->send(c->utcp, pkt, sizeof(pkt->hdr) + seglen);
702         } while(left);
703 }
704
705 ssize_t utcp_send(struct utcp_connection *c, const void *data, size_t len) {
706         if(c->reapable) {
707                 debug(c, "send() called on closed connection\n");
708                 errno = EBADF;
709                 return -1;
710         }
711
712         switch(c->state) {
713         case CLOSED:
714         case LISTEN:
715                 debug(c, "send() called on unconnected connection\n");
716                 errno = ENOTCONN;
717                 return -1;
718
719         case SYN_SENT:
720         case SYN_RECEIVED:
721         case ESTABLISHED:
722         case CLOSE_WAIT:
723                 break;
724
725         case FIN_WAIT_1:
726         case FIN_WAIT_2:
727         case CLOSING:
728         case LAST_ACK:
729         case TIME_WAIT:
730                 debug(c, "send() called on closed connection\n");
731                 errno = EPIPE;
732                 return -1;
733         }
734
735         // Exit early if we have nothing to send.
736
737         if(!len) {
738                 return 0;
739         }
740
741         if(!data) {
742                 errno = EFAULT;
743                 return -1;
744         }
745
746         // Check if we need to be able to buffer all data
747
748         if(c->flags & UTCP_NO_PARTIAL) {
749                 if(len > buffer_free(&c->sndbuf)) {
750                         if(len > c->sndbuf.maxsize) {
751                                 errno = EMSGSIZE;
752                                 return -1;
753                         } else {
754                                 errno = EWOULDBLOCK;
755                                 return 0;
756                         }
757                 }
758         }
759
760         // Add data to send buffer.
761
762         if(is_reliable(c) || (c->state != SYN_SENT && c->state != SYN_RECEIVED)) {
763                 len = buffer_put(&c->sndbuf, data, len);
764         } else {
765                 return 0;
766         }
767
768         if(len <= 0) {
769                 if(is_reliable(c)) {
770                         errno = EWOULDBLOCK;
771                         return 0;
772                 } else {
773                         return len;
774                 }
775         }
776
777         c->snd.last += len;
778
779         // Don't send anything yet if the connection has not fully established yet
780
781         if(c->state == SYN_SENT || c->state == SYN_RECEIVED) {
782                 return len;
783         }
784
785         ack(c, false);
786
787         if(!is_reliable(c)) {
788                 c->snd.una = c->snd.nxt = c->snd.last;
789                 buffer_discard(&c->sndbuf, c->sndbuf.used);
790                 c->do_poll = true;
791         }
792
793         if(is_reliable(c) && !timespec_isset(&c->rtrx_timeout)) {
794                 start_retransmit_timer(c);
795         }
796
797         if(is_reliable(c) && !timespec_isset(&c->conn_timeout)) {
798                 clock_gettime(UTCP_CLOCK, &c->conn_timeout);
799                 c->conn_timeout.tv_sec += c->utcp->timeout;
800         }
801
802         return len;
803 }
804
805 static void swap_ports(struct hdr *hdr) {
806         uint16_t tmp = hdr->src;
807         hdr->src = hdr->dst;
808         hdr->dst = tmp;
809 }
810
811 static void fast_retransmit(struct utcp_connection *c) {
812         if(c->state == CLOSED || c->snd.last == c->snd.una) {
813                 debug(c, "fast_retransmit() called but nothing to retransmit!\n");
814                 return;
815         }
816
817         struct utcp *utcp = c->utcp;
818
819         struct {
820                 struct hdr hdr;
821                 uint8_t data[];
822         } *pkt;
823
824         pkt = malloc(c->utcp->mtu);
825
826         if(!pkt) {
827                 return;
828         }
829
830         pkt->hdr.src = c->src;
831         pkt->hdr.dst = c->dst;
832         pkt->hdr.wnd = c->rcvbuf.maxsize;
833         pkt->hdr.aux = 0;
834
835         switch(c->state) {
836         case ESTABLISHED:
837         case FIN_WAIT_1:
838         case CLOSE_WAIT:
839         case CLOSING:
840         case LAST_ACK:
841                 // Send unacked data again.
842                 pkt->hdr.seq = c->snd.una;
843                 pkt->hdr.ack = c->rcv.nxt;
844                 pkt->hdr.ctl = ACK;
845                 uint32_t len = min(seqdiff(c->snd.last, c->snd.una), utcp->mss);
846
847                 if(fin_wanted(c, c->snd.una + len)) {
848                         len--;
849                         pkt->hdr.ctl |= FIN;
850                 }
851
852                 buffer_copy(&c->sndbuf, pkt->data, 0, len);
853                 print_packet(c, "rtrx", pkt, sizeof(pkt->hdr) + len);
854                 utcp->send(utcp, pkt, sizeof(pkt->hdr) + len);
855                 break;
856
857         default:
858                 break;
859         }
860
861         free(pkt);
862 }
863
864 static void retransmit(struct utcp_connection *c) {
865         if(c->state == CLOSED || c->snd.last == c->snd.una) {
866                 debug(c, "retransmit() called but nothing to retransmit!\n");
867                 stop_retransmit_timer(c);
868                 return;
869         }
870
871         struct utcp *utcp = c->utcp;
872
873         struct {
874                 struct hdr hdr;
875                 uint8_t data[];
876         } *pkt = c->utcp->pkt;
877
878         pkt->hdr.src = c->src;
879         pkt->hdr.dst = c->dst;
880         pkt->hdr.wnd = c->rcvbuf.maxsize;
881         pkt->hdr.aux = 0;
882
883         switch(c->state) {
884         case SYN_SENT:
885                 // Send our SYN again
886                 pkt->hdr.seq = c->snd.iss;
887                 pkt->hdr.ack = 0;
888                 pkt->hdr.ctl = SYN;
889                 pkt->hdr.aux = 0x0101;
890                 pkt->data[0] = 1;
891                 pkt->data[1] = 0;
892                 pkt->data[2] = 0;
893                 pkt->data[3] = c->flags & 0x7;
894                 print_packet(c, "rtrx", pkt, sizeof(pkt->hdr) + 4);
895                 utcp->send(utcp, pkt, sizeof(pkt->hdr) + 4);
896                 break;
897
898         case SYN_RECEIVED:
899                 // Send SYNACK again
900                 pkt->hdr.seq = c->snd.nxt;
901                 pkt->hdr.ack = c->rcv.nxt;
902                 pkt->hdr.ctl = SYN | ACK;
903                 print_packet(c, "rtrx", pkt, sizeof(pkt->hdr));
904                 utcp->send(utcp, pkt, sizeof(pkt->hdr));
905                 break;
906
907         case ESTABLISHED:
908         case FIN_WAIT_1:
909         case CLOSE_WAIT:
910         case CLOSING:
911         case LAST_ACK:
912                 // Send unacked data again.
913                 pkt->hdr.seq = c->snd.una;
914                 pkt->hdr.ack = c->rcv.nxt;
915                 pkt->hdr.ctl = ACK;
916                 uint32_t len = min(seqdiff(c->snd.last, c->snd.una), utcp->mss);
917
918                 if(fin_wanted(c, c->snd.una + len)) {
919                         len--;
920                         pkt->hdr.ctl |= FIN;
921                 }
922
923                 // RFC 5681 slow start after timeout
924                 uint32_t flightsize = seqdiff(c->snd.nxt, c->snd.una);
925                 c->snd.ssthresh = max(flightsize / 2, utcp->mss * 2); // eq. 4
926                 c->snd.cwnd = utcp->mss;
927                 debug_cwnd(c);
928
929                 buffer_copy(&c->sndbuf, pkt->data, 0, len);
930                 print_packet(c, "rtrx", pkt, sizeof(pkt->hdr) + len);
931                 utcp->send(utcp, pkt, sizeof(pkt->hdr) + len);
932
933                 c->snd.nxt = c->snd.una + len;
934                 break;
935
936         case CLOSED:
937         case LISTEN:
938         case TIME_WAIT:
939         case FIN_WAIT_2:
940                 // We shouldn't need to retransmit anything in this state.
941 #ifdef UTCP_DEBUG
942                 abort();
943 #endif
944                 stop_retransmit_timer(c);
945                 goto cleanup;
946         }
947
948         start_retransmit_timer(c);
949         utcp->rto *= 2;
950
951         if(utcp->rto > MAX_RTO) {
952                 utcp->rto = MAX_RTO;
953         }
954
955         c->rtt_start.tv_sec = 0; // invalidate RTT timer
956         c->dupack = 0; // cancel any ongoing fast recovery
957
958 cleanup:
959         return;
960 }
961
962 /* Update receive buffer and SACK entries after consuming data.
963  *
964  * Situation:
965  *
966  * |.....0000..1111111111.....22222......3333|
967  * |---------------^
968  *
969  * 0..3 represent the SACK entries. The ^ indicates up to which point we want
970  * to remove data from the receive buffer. The idea is to substract "len"
971  * from the offset of all the SACK entries, and then remove/cut down entries
972  * that are shifted to before the start of the receive buffer.
973  *
974  * There are three cases:
975  * - the SACK entry is after ^, in that case just change the offset.
976  * - the SACK entry starts before and ends after ^, so we have to
977  *   change both its offset and size.
978  * - the SACK entry is completely before ^, in that case delete it.
979  */
980 static void sack_consume(struct utcp_connection *c, size_t len) {
981         debug(c, "sack_consume %lu\n", (unsigned long)len);
982
983         if(len > c->rcvbuf.used) {
984                 debug(c, "all SACK entries consumed\n");
985                 c->sacks[0].len = 0;
986                 return;
987         }
988
989         buffer_discard(&c->rcvbuf, len);
990
991         for(int i = 0; i < NSACKS && c->sacks[i].len;) {
992                 if(len < c->sacks[i].offset) {
993                         c->sacks[i].offset -= len;
994                         i++;
995                 } else if(len < c->sacks[i].offset + c->sacks[i].len) {
996                         c->sacks[i].len -= len - c->sacks[i].offset;
997                         c->sacks[i].offset = 0;
998                         i++;
999                 } else {
1000                         if(i < NSACKS - 1) {
1001                                 memmove(&c->sacks[i], &c->sacks[i + 1], (NSACKS - 1 - i) * sizeof(c->sacks)[i]);
1002                                 c->sacks[NSACKS - 1].len = 0;
1003                         } else {
1004                                 c->sacks[i].len = 0;
1005                                 break;
1006                         }
1007                 }
1008         }
1009
1010         for(int i = 0; i < NSACKS && c->sacks[i].len; i++) {
1011                 debug(c, "SACK[%d] offset %u len %u\n", i, c->sacks[i].offset, c->sacks[i].len);
1012         }
1013 }
1014
1015 static void handle_out_of_order(struct utcp_connection *c, uint32_t offset, const void *data, size_t len) {
1016         debug(c, "out of order packet, offset %u\n", offset);
1017         // Packet loss or reordering occured. Store the data in the buffer.
1018         ssize_t rxd = buffer_put_at(&c->rcvbuf, offset, data, len);
1019
1020         if(rxd < 0 || (size_t)rxd < len) {
1021                 abort();
1022         }
1023
1024         // Make note of where we put it.
1025         for(int i = 0; i < NSACKS; i++) {
1026                 if(!c->sacks[i].len) { // nothing to merge, add new entry
1027                         debug(c, "new SACK entry %d\n", i);
1028                         c->sacks[i].offset = offset;
1029                         c->sacks[i].len = rxd;
1030                         break;
1031                 } else if(offset < c->sacks[i].offset) {
1032                         if(offset + rxd < c->sacks[i].offset) { // insert before
1033                                 if(!c->sacks[NSACKS - 1].len) { // only if room left
1034                                         debug(c, "insert SACK entry at %d\n", i);
1035                                         memmove(&c->sacks[i + 1], &c->sacks[i], (NSACKS - i - 1) * sizeof(c->sacks)[i]);
1036                                         c->sacks[i].offset = offset;
1037                                         c->sacks[i].len = rxd;
1038                                 } else {
1039                                         debug(c, "SACK entries full, dropping packet\n");
1040                                 }
1041
1042                                 break;
1043                         } else { // merge
1044                                 debug(c, "merge with start of SACK entry at %d\n", i);
1045                                 c->sacks[i].offset = offset;
1046                                 break;
1047                         }
1048                 } else if(offset <= c->sacks[i].offset + c->sacks[i].len) {
1049                         if(offset + rxd > c->sacks[i].offset + c->sacks[i].len) { // merge
1050                                 debug(c, "merge with end of SACK entry at %d\n", i);
1051                                 c->sacks[i].len = offset + rxd - c->sacks[i].offset;
1052                                 // TODO: handle potential merge with next entry
1053                         }
1054
1055                         break;
1056                 }
1057         }
1058
1059         for(int i = 0; i < NSACKS && c->sacks[i].len; i++) {
1060                 debug(c, "SACK[%d] offset %u len %u\n", i, c->sacks[i].offset, c->sacks[i].len);
1061         }
1062 }
1063
1064 static void handle_in_order(struct utcp_connection *c, const void *data, size_t len) {
1065         if(c->recv) {
1066                 ssize_t rxd = c->recv(c, data, len);
1067
1068                 if(rxd != (ssize_t)len) {
1069                         // TODO: handle the application not accepting all data.
1070                         abort();
1071                 }
1072         }
1073
1074         // Check if we can process out-of-order data now.
1075         if(c->sacks[0].len && len >= c->sacks[0].offset) {
1076                 debug(c, "incoming packet len %lu connected with SACK at %u\n", (unsigned long)len, c->sacks[0].offset);
1077
1078                 if(len < c->sacks[0].offset + c->sacks[0].len) {
1079                         size_t offset = len;
1080                         len = c->sacks[0].offset + c->sacks[0].len;
1081                         size_t remainder = len - offset;
1082                         ssize_t rxd = buffer_call(&c->rcvbuf, c->recv, c, offset, remainder);
1083
1084                         if(rxd != (ssize_t)remainder) {
1085                                 // TODO: handle the application not accepting all data.
1086                                 abort();
1087                         }
1088                 }
1089         }
1090
1091         if(c->rcvbuf.used) {
1092                 sack_consume(c, len);
1093         }
1094
1095         c->rcv.nxt += len;
1096 }
1097
1098
1099 static void handle_incoming_data(struct utcp_connection *c, uint32_t seq, const void *data, size_t len) {
1100         if(!is_reliable(c)) {
1101                 c->recv(c, data, len);
1102                 c->rcv.nxt = seq + len;
1103                 return;
1104         }
1105
1106         uint32_t offset = seqdiff(seq, c->rcv.nxt);
1107
1108         if(offset + len > c->rcvbuf.maxsize) {
1109                 abort();
1110         }
1111
1112         if(offset) {
1113                 handle_out_of_order(c, offset, data, len);
1114         } else {
1115                 handle_in_order(c, data, len);
1116         }
1117 }
1118
1119
1120 ssize_t utcp_recv(struct utcp *utcp, const void *data, size_t len) {
1121         const uint8_t *ptr = data;
1122
1123         if(!utcp) {
1124                 errno = EFAULT;
1125                 return -1;
1126         }
1127
1128         if(!len) {
1129                 return 0;
1130         }
1131
1132         if(!data) {
1133                 errno = EFAULT;
1134                 return -1;
1135         }
1136
1137         // Drop packets smaller than the header
1138
1139         struct hdr hdr;
1140
1141         if(len < sizeof(hdr)) {
1142                 print_packet(NULL, "recv", data, len);
1143                 errno = EBADMSG;
1144                 return -1;
1145         }
1146
1147         // Make a copy from the potentially unaligned data to a struct hdr
1148
1149         memcpy(&hdr, ptr, sizeof(hdr));
1150
1151         // Try to match the packet to an existing connection
1152
1153         struct utcp_connection *c = find_connection(utcp, hdr.dst, hdr.src);
1154         print_packet(c, "recv", data, len);
1155
1156         // Process the header
1157
1158         ptr += sizeof(hdr);
1159         len -= sizeof(hdr);
1160
1161         // Drop packets with an unknown CTL flag
1162
1163         if(hdr.ctl & ~(SYN | ACK | RST | FIN)) {
1164                 print_packet(NULL, "recv", data, len);
1165                 errno = EBADMSG;
1166                 return -1;
1167         }
1168
1169         // Check for auxiliary headers
1170
1171         const uint8_t *init = NULL;
1172
1173         uint16_t aux = hdr.aux;
1174
1175         while(aux) {
1176                 size_t auxlen = 4 * (aux >> 8) & 0xf;
1177                 uint8_t auxtype = aux & 0xff;
1178
1179                 if(len < auxlen) {
1180                         errno = EBADMSG;
1181                         return -1;
1182                 }
1183
1184                 switch(auxtype) {
1185                 case AUX_INIT:
1186                         if(!(hdr.ctl & SYN) || auxlen != 4) {
1187                                 errno = EBADMSG;
1188                                 return -1;
1189                         }
1190
1191                         init = ptr;
1192                         break;
1193
1194                 default:
1195                         errno = EBADMSG;
1196                         return -1;
1197                 }
1198
1199                 len -= auxlen;
1200                 ptr += auxlen;
1201
1202                 if(!(aux & 0x800)) {
1203                         break;
1204                 }
1205
1206                 if(len < 2) {
1207                         errno = EBADMSG;
1208                         return -1;
1209                 }
1210
1211                 memcpy(&aux, ptr, 2);
1212                 len -= 2;
1213                 ptr += 2;
1214         }
1215
1216         bool has_data = len || (hdr.ctl & (SYN | FIN));
1217
1218         // Is it for a new connection?
1219
1220         if(!c) {
1221                 // Ignore RST packets
1222
1223                 if(hdr.ctl & RST) {
1224                         return 0;
1225                 }
1226
1227                 // Is it a SYN packet and are we LISTENing?
1228
1229                 if(hdr.ctl & SYN && !(hdr.ctl & ACK) && utcp->accept) {
1230                         // If we don't want to accept it, send a RST back
1231                         if((utcp->pre_accept && !utcp->pre_accept(utcp, hdr.dst))) {
1232                                 len = 1;
1233                                 goto reset;
1234                         }
1235
1236                         // Try to allocate memory, otherwise send a RST back
1237                         c = allocate_connection(utcp, hdr.dst, hdr.src);
1238
1239                         if(!c) {
1240                                 len = 1;
1241                                 goto reset;
1242                         }
1243
1244                         // Parse auxilliary information
1245                         if(init) {
1246                                 if(init[0] < 1) {
1247                                         len = 1;
1248                                         goto reset;
1249                                 }
1250
1251                                 c->flags = init[3] & 0x7;
1252                         } else {
1253                                 c->flags = UTCP_TCP;
1254                         }
1255
1256 synack:
1257                         // Return SYN+ACK, go to SYN_RECEIVED state
1258                         c->snd.wnd = hdr.wnd;
1259                         c->rcv.irs = hdr.seq;
1260                         c->rcv.nxt = c->rcv.irs + 1;
1261                         set_state(c, SYN_RECEIVED);
1262
1263                         struct {
1264                                 struct hdr hdr;
1265                                 uint8_t data[4];
1266                         } pkt;
1267
1268                         pkt.hdr.src = c->src;
1269                         pkt.hdr.dst = c->dst;
1270                         pkt.hdr.ack = c->rcv.irs + 1;
1271                         pkt.hdr.seq = c->snd.iss;
1272                         pkt.hdr.wnd = c->rcvbuf.maxsize;
1273                         pkt.hdr.ctl = SYN | ACK;
1274
1275                         if(init) {
1276                                 pkt.hdr.aux = 0x0101;
1277                                 pkt.data[0] = 1;
1278                                 pkt.data[1] = 0;
1279                                 pkt.data[2] = 0;
1280                                 pkt.data[3] = c->flags & 0x7;
1281                                 print_packet(c, "send", &pkt, sizeof(hdr) + 4);
1282                                 utcp->send(utcp, &pkt, sizeof(hdr) + 4);
1283                         } else {
1284                                 pkt.hdr.aux = 0;
1285                                 print_packet(c, "send", &pkt, sizeof(hdr));
1286                                 utcp->send(utcp, &pkt, sizeof(hdr));
1287                         }
1288                 } else {
1289                         // No, we don't want your packets, send a RST back
1290                         len = 1;
1291                         goto reset;
1292                 }
1293
1294                 return 0;
1295         }
1296
1297         debug(c, "state %s\n", strstate[c->state]);
1298
1299         // In case this is for a CLOSED connection, ignore the packet.
1300         // TODO: make it so incoming packets can never match a CLOSED connection.
1301
1302         if(c->state == CLOSED) {
1303                 debug(c, "got packet for closed connection\n");
1304                 return 0;
1305         }
1306
1307         // It is for an existing connection.
1308
1309         // 1. Drop invalid packets.
1310
1311         // 1a. Drop packets that should not happen in our current state.
1312
1313         switch(c->state) {
1314         case SYN_SENT:
1315         case SYN_RECEIVED:
1316         case ESTABLISHED:
1317         case FIN_WAIT_1:
1318         case FIN_WAIT_2:
1319         case CLOSE_WAIT:
1320         case CLOSING:
1321         case LAST_ACK:
1322         case TIME_WAIT:
1323                 break;
1324
1325         default:
1326 #ifdef UTCP_DEBUG
1327                 abort();
1328 #endif
1329                 break;
1330         }
1331
1332         // 1b. Discard data that is not in our receive window.
1333
1334         if(is_reliable(c)) {
1335                 bool acceptable;
1336
1337                 if(c->state == SYN_SENT) {
1338                         acceptable = true;
1339                 } else if(len == 0) {
1340                         acceptable = seqdiff(hdr.seq, c->rcv.nxt) >= 0;
1341                 } else {
1342                         int32_t rcv_offset = seqdiff(hdr.seq, c->rcv.nxt);
1343
1344                         // cut already accepted front overlapping
1345                         if(rcv_offset < 0) {
1346                                 acceptable = len > (size_t) - rcv_offset;
1347
1348                                 if(acceptable) {
1349                                         ptr -= rcv_offset;
1350                                         len += rcv_offset;
1351                                         hdr.seq -= rcv_offset;
1352                                 }
1353                         } else {
1354                                 acceptable = seqdiff(hdr.seq, c->rcv.nxt) >= 0 && seqdiff(hdr.seq, c->rcv.nxt) + len <= c->rcvbuf.maxsize;
1355                         }
1356                 }
1357
1358                 if(!acceptable) {
1359                         debug(c, "packet not acceptable, %u <= %u + %lu < %u\n", c->rcv.nxt, hdr.seq, (unsigned long)len, c->rcv.nxt + c->rcvbuf.maxsize);
1360
1361                         // Ignore unacceptable RST packets.
1362                         if(hdr.ctl & RST) {
1363                                 return 0;
1364                         }
1365
1366                         // Otherwise, continue processing.
1367                         len = 0;
1368                 }
1369         } else {
1370 #if UTCP_DEBUG
1371                 int32_t rcv_offset = seqdiff(hdr.seq, c->rcv.nxt);
1372
1373                 if(rcv_offset) {
1374                         debug(c, "packet out of order, offset %u bytes", rcv_offset);
1375                 }
1376
1377                 if(rcv_offset >= 0) {
1378                         c->rcv.nxt = hdr.seq + len;
1379                 }
1380
1381 #endif
1382         }
1383
1384         c->snd.wnd = hdr.wnd; // TODO: move below
1385
1386         // 1c. Drop packets with an invalid ACK.
1387         // ackno should not roll back, and it should also not be bigger than what we ever could have sent
1388         // (= snd.una + c->sndbuf.used).
1389
1390         if(!is_reliable(c)) {
1391                 if(hdr.ack != c->snd.last && c->state >= ESTABLISHED) {
1392                         hdr.ack = c->snd.una;
1393                 }
1394         }
1395
1396         if(hdr.ctl & ACK && (seqdiff(hdr.ack, c->snd.last) > 0 || seqdiff(hdr.ack, c->snd.una) < 0)) {
1397                 debug(c, "packet ack seqno out of range, %u <= %u < %u\n", c->snd.una, hdr.ack, c->snd.una + c->sndbuf.used);
1398
1399                 // Ignore unacceptable RST packets.
1400                 if(hdr.ctl & RST) {
1401                         return 0;
1402                 }
1403
1404                 goto reset;
1405         }
1406
1407         // 2. Handle RST packets
1408
1409         if(hdr.ctl & RST) {
1410                 switch(c->state) {
1411                 case SYN_SENT:
1412                         if(!(hdr.ctl & ACK)) {
1413                                 return 0;
1414                         }
1415
1416                         // The peer has refused our connection.
1417                         set_state(c, CLOSED);
1418                         errno = ECONNREFUSED;
1419
1420                         if(c->recv) {
1421                                 c->recv(c, NULL, 0);
1422                         }
1423
1424                         if(c->poll && !c->reapable) {
1425                                 c->poll(c, 0);
1426                         }
1427
1428                         return 0;
1429
1430                 case SYN_RECEIVED:
1431                         if(hdr.ctl & ACK) {
1432                                 return 0;
1433                         }
1434
1435                         // We haven't told the application about this connection yet. Silently delete.
1436                         free_connection(c);
1437                         return 0;
1438
1439                 case ESTABLISHED:
1440                 case FIN_WAIT_1:
1441                 case FIN_WAIT_2:
1442                 case CLOSE_WAIT:
1443                         if(hdr.ctl & ACK) {
1444                                 return 0;
1445                         }
1446
1447                         // The peer has aborted our connection.
1448                         set_state(c, CLOSED);
1449                         errno = ECONNRESET;
1450
1451                         if(c->recv) {
1452                                 c->recv(c, NULL, 0);
1453                         }
1454
1455                         if(c->poll && !c->reapable) {
1456                                 c->poll(c, 0);
1457                         }
1458
1459                         return 0;
1460
1461                 case CLOSING:
1462                 case LAST_ACK:
1463                 case TIME_WAIT:
1464                         if(hdr.ctl & ACK) {
1465                                 return 0;
1466                         }
1467
1468                         // As far as the application is concerned, the connection has already been closed.
1469                         // If it has called utcp_close() already, we can immediately free this connection.
1470                         if(c->reapable) {
1471                                 free_connection(c);
1472                                 return 0;
1473                         }
1474
1475                         // Otherwise, immediately move to the CLOSED state.
1476                         set_state(c, CLOSED);
1477                         return 0;
1478
1479                 default:
1480 #ifdef UTCP_DEBUG
1481                         abort();
1482 #endif
1483                         break;
1484                 }
1485         }
1486
1487         uint32_t advanced;
1488
1489         if(!(hdr.ctl & ACK)) {
1490                 advanced = 0;
1491                 goto skip_ack;
1492         }
1493
1494         // 3. Advance snd.una
1495
1496         advanced = seqdiff(hdr.ack, c->snd.una);
1497
1498         if(advanced) {
1499                 // RTT measurement
1500                 if(c->rtt_start.tv_sec) {
1501                         if(c->rtt_seq == hdr.ack) {
1502                                 struct timespec now;
1503                                 clock_gettime(UTCP_CLOCK, &now);
1504                                 int32_t diff = timespec_diff_usec(&now, &c->rtt_start);
1505                                 update_rtt(c, diff);
1506                                 c->rtt_start.tv_sec = 0;
1507                         } else if(c->rtt_seq < hdr.ack) {
1508                                 debug(c, "cancelling RTT measurement: %u < %u\n", c->rtt_seq, hdr.ack);
1509                                 c->rtt_start.tv_sec = 0;
1510                         }
1511                 }
1512
1513                 int32_t data_acked = advanced;
1514
1515                 switch(c->state) {
1516                 case SYN_SENT:
1517                 case SYN_RECEIVED:
1518                         data_acked--;
1519                         break;
1520
1521                 // TODO: handle FIN as well.
1522                 default:
1523                         break;
1524                 }
1525
1526                 assert(data_acked >= 0);
1527
1528 #ifndef NDEBUG
1529                 int32_t bufused = seqdiff(c->snd.last, c->snd.una);
1530                 assert(data_acked <= bufused);
1531 #endif
1532
1533                 if(data_acked) {
1534                         buffer_discard(&c->sndbuf, data_acked);
1535                         c->do_poll = true;
1536                 }
1537
1538                 // Also advance snd.nxt if possible
1539                 if(seqdiff(c->snd.nxt, hdr.ack) < 0) {
1540                         c->snd.nxt = hdr.ack;
1541                 }
1542
1543                 c->snd.una = hdr.ack;
1544
1545                 if(c->dupack) {
1546                         if(c->dupack >= 3) {
1547                                 debug(c, "fast recovery ended\n");
1548                                 c->snd.cwnd = c->snd.ssthresh;
1549                         }
1550
1551                         c->dupack = 0;
1552                 }
1553
1554                 // Increase the congestion window according to RFC 5681
1555                 if(c->snd.cwnd < c->snd.ssthresh) {
1556                         c->snd.cwnd += min(advanced, utcp->mss); // eq. 2
1557                 } else {
1558                         c->snd.cwnd += max(1, (utcp->mss * utcp->mss) / c->snd.cwnd); // eq. 3
1559                 }
1560
1561                 if(c->snd.cwnd > c->sndbuf.maxsize) {
1562                         c->snd.cwnd = c->sndbuf.maxsize;
1563                 }
1564
1565                 debug_cwnd(c);
1566
1567                 // Check if we have sent a FIN that is now ACKed.
1568                 switch(c->state) {
1569                 case FIN_WAIT_1:
1570                         if(c->snd.una == c->snd.last) {
1571                                 set_state(c, FIN_WAIT_2);
1572                         }
1573
1574                         break;
1575
1576                 case CLOSING:
1577                         if(c->snd.una == c->snd.last) {
1578                                 clock_gettime(UTCP_CLOCK, &c->conn_timeout);
1579                                 c->conn_timeout.tv_sec += utcp->timeout;
1580                                 set_state(c, TIME_WAIT);
1581                         }
1582
1583                         break;
1584
1585                 default:
1586                         break;
1587                 }
1588         } else {
1589                 if(!len && is_reliable(c) && c->snd.una != c->snd.last) {
1590                         c->dupack++;
1591                         debug(c, "duplicate ACK %d\n", c->dupack);
1592
1593                         if(c->dupack == 3) {
1594                                 // RFC 5681 fast recovery
1595                                 debug(c, "fast recovery started\n", c->dupack);
1596                                 uint32_t flightsize = seqdiff(c->snd.nxt, c->snd.una);
1597                                 c->snd.ssthresh = max(flightsize / 2, utcp->mss * 2); // eq. 4
1598                                 c->snd.cwnd = min(c->snd.ssthresh + 3 * utcp->mss, c->sndbuf.maxsize);
1599
1600                                 if(c->snd.cwnd > c->sndbuf.maxsize) {
1601                                         c->snd.cwnd = c->sndbuf.maxsize;
1602                                 }
1603
1604                                 debug_cwnd(c);
1605
1606                                 fast_retransmit(c);
1607                         } else if(c->dupack > 3) {
1608                                 c->snd.cwnd += utcp->mss;
1609
1610                                 if(c->snd.cwnd > c->sndbuf.maxsize) {
1611                                         c->snd.cwnd = c->sndbuf.maxsize;
1612                                 }
1613
1614                                 debug_cwnd(c);
1615                         }
1616
1617                         // We got an ACK which indicates the other side did get one of our packets.
1618                         // Reset the retransmission timer to avoid going to slow start,
1619                         // but don't touch the connection timeout.
1620                         start_retransmit_timer(c);
1621                 }
1622         }
1623
1624         // 4. Update timers
1625
1626         if(advanced) {
1627                 if(c->snd.una == c->snd.last) {
1628                         stop_retransmit_timer(c);
1629                         timespec_clear(&c->conn_timeout);
1630                 } else if(is_reliable(c)) {
1631                         start_retransmit_timer(c);
1632                         clock_gettime(UTCP_CLOCK, &c->conn_timeout);
1633                         c->conn_timeout.tv_sec += utcp->timeout;
1634                 }
1635         }
1636
1637 skip_ack:
1638         // 5. Process SYN stuff
1639
1640         if(hdr.ctl & SYN) {
1641                 switch(c->state) {
1642                 case SYN_SENT:
1643
1644                         // This is a SYNACK. It should always have ACKed the SYN.
1645                         if(!advanced) {
1646                                 goto reset;
1647                         }
1648
1649                         c->rcv.irs = hdr.seq;
1650                         c->rcv.nxt = hdr.seq;
1651
1652                         if(c->shut_wr) {
1653                                 c->snd.last++;
1654                                 set_state(c, FIN_WAIT_1);
1655                         } else {
1656                                 set_state(c, ESTABLISHED);
1657                         }
1658
1659                         // TODO: notify application of this somehow.
1660                         break;
1661
1662                 case SYN_RECEIVED:
1663                         // This is a retransmit of a SYN, send back the SYNACK.
1664                         goto synack;
1665
1666                 case ESTABLISHED:
1667                 case FIN_WAIT_1:
1668                 case FIN_WAIT_2:
1669                 case CLOSE_WAIT:
1670                 case CLOSING:
1671                 case LAST_ACK:
1672                 case TIME_WAIT:
1673                         // Ehm, no. We should never receive a second SYN.
1674                         return 0;
1675
1676                 default:
1677 #ifdef UTCP_DEBUG
1678                         abort();
1679 #endif
1680                         return 0;
1681                 }
1682
1683                 // SYN counts as one sequence number
1684                 c->rcv.nxt++;
1685         }
1686
1687         // 6. Process new data
1688
1689         if(c->state == SYN_RECEIVED) {
1690                 // This is the ACK after the SYNACK. It should always have ACKed the SYNACK.
1691                 if(!advanced) {
1692                         goto reset;
1693                 }
1694
1695                 // Are we still LISTENing?
1696                 if(utcp->accept) {
1697                         utcp->accept(c, c->src);
1698                 }
1699
1700                 if(c->state != ESTABLISHED) {
1701                         set_state(c, CLOSED);
1702                         c->reapable = true;
1703                         goto reset;
1704                 }
1705         }
1706
1707         if(len) {
1708                 switch(c->state) {
1709                 case SYN_SENT:
1710                 case SYN_RECEIVED:
1711                         // This should never happen.
1712 #ifdef UTCP_DEBUG
1713                         abort();
1714 #endif
1715                         return 0;
1716
1717                 case ESTABLISHED:
1718                 case FIN_WAIT_1:
1719                 case FIN_WAIT_2:
1720                         break;
1721
1722                 case CLOSE_WAIT:
1723                 case CLOSING:
1724                 case LAST_ACK:
1725                 case TIME_WAIT:
1726                         // Ehm no, We should never receive more data after a FIN.
1727                         goto reset;
1728
1729                 default:
1730 #ifdef UTCP_DEBUG
1731                         abort();
1732 #endif
1733                         return 0;
1734                 }
1735
1736                 handle_incoming_data(c, hdr.seq, ptr, len);
1737         }
1738
1739         // 7. Process FIN stuff
1740
1741         if((hdr.ctl & FIN) && (!is_reliable(c) || hdr.seq + len == c->rcv.nxt)) {
1742                 switch(c->state) {
1743                 case SYN_SENT:
1744                 case SYN_RECEIVED:
1745                         // This should never happen.
1746 #ifdef UTCP_DEBUG
1747                         abort();
1748 #endif
1749                         break;
1750
1751                 case ESTABLISHED:
1752                         set_state(c, CLOSE_WAIT);
1753                         break;
1754
1755                 case FIN_WAIT_1:
1756                         set_state(c, CLOSING);
1757                         break;
1758
1759                 case FIN_WAIT_2:
1760                         clock_gettime(UTCP_CLOCK, &c->conn_timeout);
1761                         c->conn_timeout.tv_sec += utcp->timeout;
1762                         set_state(c, TIME_WAIT);
1763                         break;
1764
1765                 case CLOSE_WAIT:
1766                 case CLOSING:
1767                 case LAST_ACK:
1768                 case TIME_WAIT:
1769                         // Ehm, no. We should never receive a second FIN.
1770                         goto reset;
1771
1772                 default:
1773 #ifdef UTCP_DEBUG
1774                         abort();
1775 #endif
1776                         break;
1777                 }
1778
1779                 // FIN counts as one sequence number
1780                 c->rcv.nxt++;
1781                 len++;
1782
1783                 // Inform the application that the peer closed its end of the connection.
1784                 if(c->recv) {
1785                         errno = 0;
1786                         c->recv(c, NULL, 0);
1787                 }
1788         }
1789
1790         // Now we send something back if:
1791         // - we received data, so we have to send back an ACK
1792         //   -> sendatleastone = true
1793         // - or we got an ack, so we should maybe send a bit more data
1794         //   -> sendatleastone = false
1795
1796         if(is_reliable(c) || hdr.ctl & SYN || hdr.ctl & FIN) {
1797                 ack(c, has_data);
1798         }
1799
1800         return 0;
1801
1802 reset:
1803         swap_ports(&hdr);
1804         hdr.wnd = 0;
1805         hdr.aux = 0;
1806
1807         if(hdr.ctl & ACK) {
1808                 hdr.seq = hdr.ack;
1809                 hdr.ctl = RST;
1810         } else {
1811                 hdr.ack = hdr.seq + len;
1812                 hdr.seq = 0;
1813                 hdr.ctl = RST | ACK;
1814         }
1815
1816         print_packet(c, "send", &hdr, sizeof(hdr));
1817         utcp->send(utcp, &hdr, sizeof(hdr));
1818         return 0;
1819
1820 }
1821
1822 int utcp_shutdown(struct utcp_connection *c, int dir) {
1823         debug(c, "shutdown %d at %u\n", dir, c ? c->snd.last : 0);
1824
1825         if(!c) {
1826                 errno = EFAULT;
1827                 return -1;
1828         }
1829
1830         if(c->reapable) {
1831                 debug(c, "shutdown() called on closed connection\n");
1832                 errno = EBADF;
1833                 return -1;
1834         }
1835
1836         if(!(dir == UTCP_SHUT_RD || dir == UTCP_SHUT_WR || dir == UTCP_SHUT_RDWR)) {
1837                 errno = EINVAL;
1838                 return -1;
1839         }
1840
1841         // TCP does not have a provision for stopping incoming packets.
1842         // The best we can do is to just ignore them.
1843         if(dir == UTCP_SHUT_RD || dir == UTCP_SHUT_RDWR) {
1844                 c->recv = NULL;
1845         }
1846
1847         // The rest of the code deals with shutting down writes.
1848         if(dir == UTCP_SHUT_RD) {
1849                 return 0;
1850         }
1851
1852         // Only process shutting down writes once.
1853         if(c->shut_wr) {
1854                 return 0;
1855         }
1856
1857         c->shut_wr = true;
1858
1859         switch(c->state) {
1860         case CLOSED:
1861         case LISTEN:
1862                 errno = ENOTCONN;
1863                 return -1;
1864
1865         case SYN_SENT:
1866                 return 0;
1867
1868         case SYN_RECEIVED:
1869         case ESTABLISHED:
1870                 set_state(c, FIN_WAIT_1);
1871                 break;
1872
1873         case FIN_WAIT_1:
1874         case FIN_WAIT_2:
1875                 return 0;
1876
1877         case CLOSE_WAIT:
1878                 set_state(c, CLOSING);
1879                 break;
1880
1881         case CLOSING:
1882         case LAST_ACK:
1883         case TIME_WAIT:
1884                 return 0;
1885         }
1886
1887         c->snd.last++;
1888
1889         ack(c, false);
1890
1891         if(!timespec_isset(&c->rtrx_timeout)) {
1892                 start_retransmit_timer(c);
1893         }
1894
1895         return 0;
1896 }
1897
1898 static bool reset_connection(struct utcp_connection *c) {
1899         if(!c) {
1900                 errno = EFAULT;
1901                 return false;
1902         }
1903
1904         if(c->reapable) {
1905                 debug(c, "abort() called on closed connection\n");
1906                 errno = EBADF;
1907                 return false;
1908         }
1909
1910         c->recv = NULL;
1911         c->poll = NULL;
1912
1913         switch(c->state) {
1914         case CLOSED:
1915                 return true;
1916
1917         case LISTEN:
1918         case SYN_SENT:
1919         case CLOSING:
1920         case LAST_ACK:
1921         case TIME_WAIT:
1922                 set_state(c, CLOSED);
1923                 return true;
1924
1925         case SYN_RECEIVED:
1926         case ESTABLISHED:
1927         case FIN_WAIT_1:
1928         case FIN_WAIT_2:
1929         case CLOSE_WAIT:
1930                 set_state(c, CLOSED);
1931                 break;
1932         }
1933
1934         // Send RST
1935
1936         struct hdr hdr;
1937
1938         hdr.src = c->src;
1939         hdr.dst = c->dst;
1940         hdr.seq = c->snd.nxt;
1941         hdr.ack = 0;
1942         hdr.wnd = 0;
1943         hdr.ctl = RST;
1944
1945         print_packet(c, "send", &hdr, sizeof(hdr));
1946         c->utcp->send(c->utcp, &hdr, sizeof(hdr));
1947         return true;
1948 }
1949
1950 // Closes all the opened connections
1951 void utcp_abort_all_connections(struct utcp *utcp) {
1952         if(!utcp) {
1953                 errno = EINVAL;
1954                 return;
1955         }
1956
1957         for(int i = 0; i < utcp->nconnections; i++) {
1958                 struct utcp_connection *c = utcp->connections[i];
1959
1960                 if(c->reapable || c->state == CLOSED) {
1961                         continue;
1962                 }
1963
1964                 utcp_recv_t old_recv = c->recv;
1965                 utcp_poll_t old_poll = c->poll;
1966
1967                 reset_connection(c);
1968
1969                 if(old_recv) {
1970                         errno = 0;
1971                         old_recv(c, NULL, 0);
1972                 }
1973
1974                 if(old_poll && !c->reapable) {
1975                         errno = 0;
1976                         old_poll(c, 0);
1977                 }
1978         }
1979
1980         return;
1981 }
1982
1983 int utcp_close(struct utcp_connection *c) {
1984         if(utcp_shutdown(c, SHUT_RDWR) && errno != ENOTCONN) {
1985                 return -1;
1986         }
1987
1988         c->recv = NULL;
1989         c->poll = NULL;
1990         c->reapable = true;
1991         return 0;
1992 }
1993
1994 int utcp_abort(struct utcp_connection *c) {
1995         if(!reset_connection(c)) {
1996                 return -1;
1997         }
1998
1999         c->reapable = true;
2000         return 0;
2001 }
2002
2003 /* Handle timeouts.
2004  * One call to this function will loop through all connections,
2005  * checking if something needs to be resent or not.
2006  * The return value is the time to the next timeout in milliseconds,
2007  * or maybe a negative value if the timeout is infinite.
2008  */
2009 struct timespec utcp_timeout(struct utcp *utcp) {
2010         struct timespec now;
2011         clock_gettime(UTCP_CLOCK, &now);
2012         struct timespec next = {now.tv_sec + 3600, now.tv_nsec};
2013
2014         for(int i = 0; i < utcp->nconnections; i++) {
2015                 struct utcp_connection *c = utcp->connections[i];
2016
2017                 if(!c) {
2018                         continue;
2019                 }
2020
2021                 // delete connections that have been utcp_close()d.
2022                 if(c->state == CLOSED) {
2023                         if(c->reapable) {
2024                                 debug(c, "reaping\n");
2025                                 free_connection(c);
2026                                 i--;
2027                         }
2028
2029                         continue;
2030                 }
2031
2032                 if(timespec_isset(&c->conn_timeout) && timespec_lt(&c->conn_timeout, &now)) {
2033                         errno = ETIMEDOUT;
2034                         c->state = CLOSED;
2035
2036                         if(c->recv) {
2037                                 c->recv(c, NULL, 0);
2038                         }
2039
2040                         if(c->poll && !c->reapable) {
2041                                 c->poll(c, 0);
2042                         }
2043
2044                         continue;
2045                 }
2046
2047                 if(timespec_isset(&c->rtrx_timeout) && timespec_lt(&c->rtrx_timeout, &now)) {
2048                         debug(c, "retransmitting after timeout\n");
2049                         retransmit(c);
2050                 }
2051
2052                 if(c->poll) {
2053                         if((c->state == ESTABLISHED || c->state == CLOSE_WAIT) && c->do_poll) {
2054                                 c->do_poll = false;
2055                                 uint32_t len = buffer_free(&c->sndbuf);
2056
2057                                 if(len) {
2058                                         c->poll(c, len);
2059                                 }
2060                         } else if(c->state == CLOSED) {
2061                                 c->poll(c, 0);
2062                         }
2063                 }
2064
2065                 if(timespec_isset(&c->conn_timeout) && timespec_lt(&c->conn_timeout, &next)) {
2066                         next = c->conn_timeout;
2067                 }
2068
2069                 if(timespec_isset(&c->rtrx_timeout) && timespec_lt(&c->rtrx_timeout, &next)) {
2070                         next = c->rtrx_timeout;
2071                 }
2072         }
2073
2074         struct timespec diff;
2075
2076         timespec_sub(&next, &now, &diff);
2077
2078         return diff;
2079 }
2080
2081 bool utcp_is_active(struct utcp *utcp) {
2082         if(!utcp) {
2083                 return false;
2084         }
2085
2086         for(int i = 0; i < utcp->nconnections; i++)
2087                 if(utcp->connections[i]->state != CLOSED && utcp->connections[i]->state != TIME_WAIT) {
2088                         return true;
2089                 }
2090
2091         return false;
2092 }
2093
2094 struct utcp *utcp_init(utcp_accept_t accept, utcp_pre_accept_t pre_accept, utcp_send_t send, void *priv) {
2095         if(!send) {
2096                 errno = EFAULT;
2097                 return NULL;
2098         }
2099
2100         struct utcp *utcp = calloc(1, sizeof(*utcp));
2101
2102         if(!utcp) {
2103                 return NULL;
2104         }
2105
2106         if(!CLOCK_GRANULARITY) {
2107                 struct timespec res;
2108                 clock_getres(UTCP_CLOCK, &res);
2109                 CLOCK_GRANULARITY = res.tv_sec * USEC_PER_SEC + res.tv_nsec / 1000;
2110         }
2111
2112         utcp->accept = accept;
2113         utcp->pre_accept = pre_accept;
2114         utcp->send = send;
2115         utcp->priv = priv;
2116         utcp_set_mtu(utcp, DEFAULT_MTU);
2117         utcp->timeout = DEFAULT_USER_TIMEOUT; // sec
2118         utcp->rto = START_RTO; // usec
2119
2120         return utcp;
2121 }
2122
2123 void utcp_exit(struct utcp *utcp) {
2124         if(!utcp) {
2125                 return;
2126         }
2127
2128         for(int i = 0; i < utcp->nconnections; i++) {
2129                 struct utcp_connection *c = utcp->connections[i];
2130
2131                 if(!c->reapable) {
2132                         if(c->recv) {
2133                                 c->recv(c, NULL, 0);
2134                         }
2135
2136                         if(c->poll && !c->reapable) {
2137                                 c->poll(c, 0);
2138                         }
2139                 }
2140
2141                 buffer_exit(&c->rcvbuf);
2142                 buffer_exit(&c->sndbuf);
2143                 free(c);
2144         }
2145
2146         free(utcp->connections);
2147         free(utcp->pkt);
2148         free(utcp);
2149 }
2150
2151 uint16_t utcp_get_mtu(struct utcp *utcp) {
2152         return utcp ? utcp->mtu : 0;
2153 }
2154
2155 uint16_t utcp_get_mss(struct utcp *utcp) {
2156         return utcp ? utcp->mss : 0;
2157 }
2158
2159 void utcp_set_mtu(struct utcp *utcp, uint16_t mtu) {
2160         if(!utcp) {
2161                 return;
2162         }
2163
2164         if(mtu <= sizeof(struct hdr)) {
2165                 return;
2166         }
2167
2168         if(mtu > utcp->mtu) {
2169                 char *new = realloc(utcp->pkt, mtu + sizeof(struct hdr));
2170
2171                 if(!new) {
2172                         return;
2173                 }
2174
2175                 utcp->pkt = new;
2176         }
2177
2178         utcp->mtu = mtu;
2179         utcp->mss = mtu - sizeof(struct hdr);
2180 }
2181
2182 void utcp_reset_timers(struct utcp *utcp) {
2183         if(!utcp) {
2184                 return;
2185         }
2186
2187         struct timespec now, then;
2188
2189         clock_gettime(UTCP_CLOCK, &now);
2190
2191         then = now;
2192
2193         then.tv_sec += utcp->timeout;
2194
2195         for(int i = 0; i < utcp->nconnections; i++) {
2196                 struct utcp_connection *c = utcp->connections[i];
2197
2198                 if(c->reapable) {
2199                         continue;
2200                 }
2201
2202                 if(timespec_isset(&c->rtrx_timeout)) {
2203                         c->rtrx_timeout = now;
2204                 }
2205
2206                 if(timespec_isset(&c->conn_timeout)) {
2207                         c->conn_timeout = then;
2208                 }
2209
2210                 c->rtt_start.tv_sec = 0;
2211         }
2212
2213         if(utcp->rto > START_RTO) {
2214                 utcp->rto = START_RTO;
2215         }
2216 }
2217
2218 int utcp_get_user_timeout(struct utcp *u) {
2219         return u ? u->timeout : 0;
2220 }
2221
2222 void utcp_set_user_timeout(struct utcp *u, int timeout) {
2223         if(u) {
2224                 u->timeout = timeout;
2225         }
2226 }
2227
2228 size_t utcp_get_sndbuf(struct utcp_connection *c) {
2229         return c ? c->sndbuf.maxsize : 0;
2230 }
2231
2232 size_t utcp_get_sndbuf_free(struct utcp_connection *c) {
2233         if(!c) {
2234                 return 0;
2235         }
2236
2237         switch(c->state) {
2238         case SYN_SENT:
2239         case SYN_RECEIVED:
2240         case ESTABLISHED:
2241         case CLOSE_WAIT:
2242                 return buffer_free(&c->sndbuf);
2243
2244         default:
2245                 return 0;
2246         }
2247 }
2248
2249 void utcp_set_sndbuf(struct utcp_connection *c, size_t size) {
2250         if(!c) {
2251                 return;
2252         }
2253
2254         c->sndbuf.maxsize = size;
2255
2256         if(c->sndbuf.maxsize != size) {
2257                 c->sndbuf.maxsize = -1;
2258         }
2259
2260         c->do_poll = buffer_free(&c->sndbuf);
2261 }
2262
2263 size_t utcp_get_rcvbuf(struct utcp_connection *c) {
2264         return c ? c->rcvbuf.maxsize : 0;
2265 }
2266
2267 size_t utcp_get_rcvbuf_free(struct utcp_connection *c) {
2268         if(c && (c->state == ESTABLISHED || c->state == CLOSE_WAIT)) {
2269                 return buffer_free(&c->rcvbuf);
2270         } else {
2271                 return 0;
2272         }
2273 }
2274
2275 void utcp_set_rcvbuf(struct utcp_connection *c, size_t size) {
2276         if(!c) {
2277                 return;
2278         }
2279
2280         c->rcvbuf.maxsize = size;
2281
2282         if(c->rcvbuf.maxsize != size) {
2283                 c->rcvbuf.maxsize = -1;
2284         }
2285 }
2286
2287 size_t utcp_get_sendq(struct utcp_connection *c) {
2288         return c->sndbuf.used;
2289 }
2290
2291 size_t utcp_get_recvq(struct utcp_connection *c) {
2292         return c->rcvbuf.used;
2293 }
2294
2295 bool utcp_get_nodelay(struct utcp_connection *c) {
2296         return c ? c->nodelay : false;
2297 }
2298
2299 void utcp_set_nodelay(struct utcp_connection *c, bool nodelay) {
2300         if(c) {
2301                 c->nodelay = nodelay;
2302         }
2303 }
2304
2305 bool utcp_get_keepalive(struct utcp_connection *c) {
2306         return c ? c->keepalive : false;
2307 }
2308
2309 void utcp_set_keepalive(struct utcp_connection *c, bool keepalive) {
2310         if(c) {
2311                 c->keepalive = keepalive;
2312         }
2313 }
2314
2315 size_t utcp_get_outq(struct utcp_connection *c) {
2316         return c ? seqdiff(c->snd.nxt, c->snd.una) : 0;
2317 }
2318
2319 void utcp_set_recv_cb(struct utcp_connection *c, utcp_recv_t recv) {
2320         if(c) {
2321                 c->recv = recv;
2322         }
2323 }
2324
2325 void utcp_set_poll_cb(struct utcp_connection *c, utcp_poll_t poll) {
2326         if(c) {
2327                 c->poll = poll;
2328                 c->do_poll = buffer_free(&c->sndbuf);
2329         }
2330 }
2331
2332 void utcp_set_accept_cb(struct utcp *utcp, utcp_accept_t accept, utcp_pre_accept_t pre_accept) {
2333         if(utcp) {
2334                 utcp->accept = accept;
2335                 utcp->pre_accept = pre_accept;
2336         }
2337 }
2338
2339 void utcp_expect_data(struct utcp_connection *c, bool expect) {
2340         if(!c || c->reapable) {
2341                 return;
2342         }
2343
2344         if(!(c->state == ESTABLISHED || c->state == FIN_WAIT_1 || c->state == FIN_WAIT_2)) {
2345                 return;
2346         }
2347
2348         if(expect) {
2349                 // If we expect data, start the connection timer.
2350                 if(!timespec_isset(&c->conn_timeout)) {
2351                         clock_gettime(UTCP_CLOCK, &c->conn_timeout);
2352                         c->conn_timeout.tv_sec += c->utcp->timeout;
2353                 }
2354         } else {
2355                 // If we want to cancel expecting data, only clear the timer when there is no unACKed data.
2356                 if(c->snd.una == c->snd.last) {
2357                         timespec_clear(&c->conn_timeout);
2358                 }
2359         }
2360 }
2361
2362 void utcp_offline(struct utcp *utcp, bool offline) {
2363         struct timespec now;
2364         clock_gettime(UTCP_CLOCK, &now);
2365
2366         for(int i = 0; i < utcp->nconnections; i++) {
2367                 struct utcp_connection *c = utcp->connections[i];
2368
2369                 if(c->reapable) {
2370                         continue;
2371                 }
2372
2373                 utcp_expect_data(c, offline);
2374
2375                 if(!offline) {
2376                         if(timespec_isset(&c->rtrx_timeout)) {
2377                                 c->rtrx_timeout = now;
2378                         }
2379
2380                         utcp->connections[i]->rtt_start.tv_sec = 0;
2381                 }
2382         }
2383
2384         if(!offline && utcp->rto > START_RTO) {
2385                 utcp->rto = START_RTO;
2386         }
2387 }
2388
2389 void utcp_set_clock_granularity(long granularity) {
2390         CLOCK_GRANULARITY = granularity;
2391 }