]> git.meshlink.io Git - meshlink/blob - src/utcp.c
Wake up the MeshLink thread if framed channel data is pending to be flushed.
[meshlink] / src / utcp.c
1 /*
2     utcp.c -- Userspace TCP
3     Copyright (C) 2014-2017 Guus Sliepen <guus@tinc-vpn.org>
4
5     This program is free software; you can redistribute it and/or modify
6     it under the terms of the GNU General Public License as published by
7     the Free Software Foundation; either version 2 of the License, or
8     (at your option) any later version.
9
10     This program is distributed in the hope that it will be useful,
11     but WITHOUT ANY WARRANTY; without even the implied warranty of
12     MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
13     GNU General Public License for more details.
14
15     You should have received a copy of the GNU General Public License along
16     with this program; if not, write to the Free Software Foundation, Inc.,
17     51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
18 */
19
20 #define _GNU_SOURCE
21
22 #include <assert.h>
23 #include <errno.h>
24 #include <stdio.h>
25 #include <stdlib.h>
26 #include <stdint.h>
27 #include <stdbool.h>
28 #include <string.h>
29 #include <unistd.h>
30 #include <time.h>
31
32 #include "utcp_priv.h"
33
34 #ifndef EBADMSG
35 #define EBADMSG         104
36 #endif
37
38 #ifndef SHUT_RDWR
39 #define SHUT_RDWR 2
40 #endif
41
42 #ifdef poll
43 #undef poll
44 #endif
45
46 #ifndef UTCP_CLOCK
47 #if defined(CLOCK_MONOTONIC_RAW) && defined(__x86_64__)
48 #define UTCP_CLOCK CLOCK_MONOTONIC_RAW
49 #else
50 #define UTCP_CLOCK CLOCK_MONOTONIC
51 #endif
52 #endif
53
54 static void timespec_sub(const struct timespec *a, const struct timespec *b, struct timespec *r) {
55         r->tv_sec = a->tv_sec - b->tv_sec;
56         r->tv_nsec = a->tv_nsec - b->tv_nsec;
57
58         if(r->tv_nsec < 0) {
59                 r->tv_sec--, r->tv_nsec += NSEC_PER_SEC;
60         }
61 }
62
63 static int32_t timespec_diff_usec(const struct timespec *a, const struct timespec *b) {
64         return (a->tv_sec - b->tv_sec) * 1000000 + (a->tv_nsec - b->tv_nsec) / 1000;
65 }
66
67 static bool timespec_lt(const struct timespec *a, const struct timespec *b) {
68         if(a->tv_sec == b->tv_sec) {
69                 return a->tv_nsec < b->tv_nsec;
70         } else {
71                 return a->tv_sec < b->tv_sec;
72         }
73 }
74
75 static void timespec_clear(struct timespec *a) {
76         a->tv_sec = 0;
77         a->tv_nsec = 0;
78 }
79
80 static bool timespec_isset(const struct timespec *a) {
81         return a->tv_sec;
82 }
83
84 static long CLOCK_GRANULARITY; // usec
85
86 static inline size_t min(size_t a, size_t b) {
87         return a < b ? a : b;
88 }
89
90 static inline size_t max(size_t a, size_t b) {
91         return a > b ? a : b;
92 }
93
94 #ifdef UTCP_DEBUG
95 #include <stdarg.h>
96
97 #ifndef UTCP_DEBUG_DATALEN
98 #define UTCP_DEBUG_DATALEN 20
99 #endif
100
101 static void debug(struct utcp_connection *c, const char *format, ...) {
102         struct timespec tv;
103         char buf[1024];
104         int len;
105
106         clock_gettime(CLOCK_REALTIME, &tv);
107         len = snprintf(buf, sizeof(buf), "%ld.%06lu %u:%u ", (long)tv.tv_sec, tv.tv_nsec / 1000, c ? c->src : 0, c ? c->dst : 0);
108         va_list ap;
109         va_start(ap, format);
110         len += vsnprintf(buf + len, sizeof(buf) - len, format, ap);
111         va_end(ap);
112
113         if(len > 0 && (size_t)len < sizeof(buf)) {
114                 fwrite(buf, len, 1, stderr);
115         }
116 }
117
118 static void print_packet(struct utcp_connection *c, const char *dir, const void *pkt, size_t len) {
119         struct hdr hdr;
120
121         if(len < sizeof(hdr)) {
122                 debug(c, "%s: short packet (%lu bytes)\n", dir, (unsigned long)len);
123                 return;
124         }
125
126         memcpy(&hdr, pkt, sizeof(hdr));
127
128         uint32_t datalen;
129
130         if(len > sizeof(hdr)) {
131                 datalen = min(len - sizeof(hdr), UTCP_DEBUG_DATALEN);
132         } else {
133                 datalen = 0;
134         }
135
136
137         const uint8_t *data = (uint8_t *)pkt + sizeof(hdr);
138         char str[datalen * 2 + 1];
139         char *p = str;
140
141         for(uint32_t i = 0; i < datalen; i++) {
142                 *p++ = "0123456789ABCDEF"[data[i] >> 4];
143                 *p++ = "0123456789ABCDEF"[data[i] & 15];
144         }
145
146         *p = 0;
147
148         debug(c, "%s: len %lu src %u dst %u seq %u ack %u wnd %u aux %x ctl %s%s%s%s%s data %s\n",
149               dir, (unsigned long)len, hdr.src, hdr.dst, hdr.seq, hdr.ack, hdr.wnd, hdr.aux,
150               hdr.ctl & SYN ? "SYN" : "",
151               hdr.ctl & RST ? "RST" : "",
152               hdr.ctl & FIN ? "FIN" : "",
153               hdr.ctl & ACK ? "ACK" : "",
154               hdr.ctl & MF ? "MF" : "",
155               str
156              );
157 }
158
159 static void debug_cwnd(struct utcp_connection *c) {
160         debug(c, "snd.cwnd %u snd.ssthresh %u\n", c->snd.cwnd, ~c->snd.ssthresh ? c->snd.ssthresh : 0);
161 }
162 #else
163 #define debug(...) do {} while(0)
164 #define print_packet(...) do {} while(0)
165 #define debug_cwnd(...) do {} while(0)
166 #endif
167
168 static void set_state(struct utcp_connection *c, enum state state) {
169         c->state = state;
170
171         if(state == ESTABLISHED) {
172                 timespec_clear(&c->conn_timeout);
173         }
174
175         debug(c, "state %s\n", strstate[state]);
176 }
177
178 static bool fin_wanted(struct utcp_connection *c, uint32_t seq) {
179         if(seq != c->snd.last) {
180                 return false;
181         }
182
183         switch(c->state) {
184         case FIN_WAIT_1:
185         case CLOSING:
186         case LAST_ACK:
187                 return true;
188
189         default:
190                 return false;
191         }
192 }
193
194 static bool is_reliable(struct utcp_connection *c) {
195         return c->flags & UTCP_RELIABLE;
196 }
197
198 static bool is_framed(struct utcp_connection *c) {
199         return c->flags & UTCP_FRAMED;
200 }
201
202 static int32_t seqdiff(uint32_t a, uint32_t b) {
203         return a - b;
204 }
205
206 // Buffer functions
207 static bool buffer_wraps(struct buffer *buf) {
208         return buf->size - buf->offset < buf->used;
209 }
210
211 static bool buffer_resize(struct buffer *buf, uint32_t newsize) {
212         char *newdata = realloc(buf->data, newsize);
213
214         if(!newdata) {
215                 return false;
216         }
217
218         buf->data = newdata;
219
220         if(buffer_wraps(buf)) {
221                 // Shift the right part of the buffer until it hits the end of the new buffer.
222                 // Old situation:
223                 // [345......012]
224                 // New situation:
225                 // [345.........|........012]
226                 uint32_t tailsize = buf->size - buf->offset;
227                 uint32_t newoffset = newsize - tailsize;
228                 memmove(buf->data + newoffset, buf->data + buf->offset, tailsize);
229                 buf->offset = newoffset;
230         }
231
232         buf->size = newsize;
233         return true;
234 }
235
236 // Store data into the buffer
237 static ssize_t buffer_put_at(struct buffer *buf, size_t offset, const void *data, size_t len) {
238         debug(NULL, "buffer_put_at %lu %lu %lu\n", (unsigned long)buf->used, (unsigned long)offset, (unsigned long)len);
239
240         // Ensure we don't store more than maxsize bytes in total
241         size_t required = offset + len;
242
243         if(required > buf->maxsize) {
244                 if(offset >= buf->maxsize) {
245                         return 0;
246                 }
247
248                 len = buf->maxsize - offset;
249                 required = buf->maxsize;
250         }
251
252         // Check if we need to resize the buffer
253         if(required > buf->size) {
254                 size_t newsize = buf->size;
255
256                 if(!newsize) {
257                         newsize = 4096;
258                 }
259
260                 do {
261                         newsize *= 2;
262                 } while(newsize < required);
263
264                 if(newsize > buf->maxsize) {
265                         newsize = buf->maxsize;
266                 }
267
268                 if(!buffer_resize(buf, newsize)) {
269                         return -1;
270                 }
271         }
272
273         uint32_t realoffset = buf->offset + offset;
274
275         if(buf->size - buf->offset <= offset) {
276                 // The offset wrapped
277                 realoffset -= buf->size;
278         }
279
280         if(buf->size - realoffset < len) {
281                 // The new chunk of data must be wrapped
282                 memcpy(buf->data + realoffset, data, buf->size - realoffset);
283                 memcpy(buf->data, (char *)data + buf->size - realoffset, len - (buf->size - realoffset));
284         } else {
285                 memcpy(buf->data + realoffset, data, len);
286         }
287
288         if(required > buf->used) {
289                 buf->used = required;
290         }
291
292         return len;
293 }
294
295 static ssize_t buffer_put(struct buffer *buf, const void *data, size_t len) {
296         return buffer_put_at(buf, buf->used, data, len);
297 }
298
299 // Copy data from the buffer without removing it.
300 static ssize_t buffer_copy(struct buffer *buf, void *data, size_t offset, size_t len) {
301         // Ensure we don't copy more than is actually stored in the buffer
302         if(offset >= buf->used) {
303                 return 0;
304         }
305
306         if(buf->used - offset < len) {
307                 len = buf->used - offset;
308         }
309
310         uint32_t realoffset = buf->offset + offset;
311
312         if(buf->size - buf->offset <= offset) {
313                 // The offset wrapped
314                 realoffset -= buf->size;
315         }
316
317         if(buf->size - realoffset < len) {
318                 // The data is wrapped
319                 memcpy(data, buf->data + realoffset, buf->size - realoffset);
320                 memcpy((char *)data + buf->size - realoffset, buf->data, len - (buf->size - realoffset));
321         } else {
322                 memcpy(data, buf->data + realoffset, len);
323         }
324
325         return len;
326 }
327
328 // Copy data from the buffer without removing it.
329 static ssize_t buffer_call(struct utcp_connection *c, struct buffer *buf, size_t offset, size_t len) {
330         if(!c->recv) {
331                 return len;
332         }
333
334         // Ensure we don't copy more than is actually stored in the buffer
335         if(offset >= buf->used) {
336                 return 0;
337         }
338
339         if(buf->used - offset < len) {
340                 len = buf->used - offset;
341         }
342
343         uint32_t realoffset = buf->offset + offset;
344
345         if(buf->size - buf->offset <= offset) {
346                 // The offset wrapped
347                 realoffset -= buf->size;
348         }
349
350         if(buf->size - realoffset < len) {
351                 // The data is wrapped
352                 ssize_t rx1 = c->recv(c, buf->data + realoffset, buf->size - realoffset);
353
354                 if(rx1 < buf->size - realoffset) {
355                         return rx1;
356                 }
357
358                 // The channel might have been closed by the previous callback
359                 if(!c->recv) {
360                         return len;
361                 }
362
363                 ssize_t rx2 = c->recv(c, buf->data, len - (buf->size - realoffset));
364
365                 if(rx2 < 0) {
366                         return rx2;
367                 } else {
368                         return rx1 + rx2;
369                 }
370         } else {
371                 return c->recv(c, buf->data + realoffset, len);
372         }
373 }
374
375 // Discard data from the buffer.
376 static ssize_t buffer_discard(struct buffer *buf, size_t len) {
377         if(buf->used < len) {
378                 len = buf->used;
379         }
380
381         if(buf->size - buf->offset <= len) {
382                 buf->offset -= buf->size;
383         }
384
385         if(buf->used == len) {
386                 buf->offset = 0;
387         } else {
388                 buf->offset += len;
389         }
390
391         buf->used -= len;
392
393         return len;
394 }
395
396 static void buffer_clear(struct buffer *buf) {
397         buf->used = 0;
398         buf->offset = 0;
399 }
400
401 static bool buffer_set_size(struct buffer *buf, uint32_t minsize, uint32_t maxsize) {
402         if(maxsize < minsize) {
403                 maxsize = minsize;
404         }
405
406         buf->maxsize = maxsize;
407
408         return buf->size >= minsize || buffer_resize(buf, minsize);
409 }
410
411 static void buffer_exit(struct buffer *buf) {
412         free(buf->data);
413         memset(buf, 0, sizeof(*buf));
414 }
415
416 static uint32_t buffer_free(const struct buffer *buf) {
417         return buf->maxsize > buf->used ? buf->maxsize - buf->used : 0;
418 }
419
420 // Connections are stored in a sorted list.
421 // This gives O(log(N)) lookup time, O(N log(N)) insertion time and O(N) deletion time.
422
423 static int compare(const void *va, const void *vb) {
424         assert(va && vb);
425
426         const struct utcp_connection *a = *(struct utcp_connection **)va;
427         const struct utcp_connection *b = *(struct utcp_connection **)vb;
428
429         assert(a && b);
430         assert(a->src && b->src);
431
432         int c = (int)a->src - (int)b->src;
433
434         if(c) {
435                 return c;
436         }
437
438         c = (int)a->dst - (int)b->dst;
439         return c;
440 }
441
442 static struct utcp_connection *find_connection(const struct utcp *utcp, uint16_t src, uint16_t dst) {
443         if(!utcp->nconnections) {
444                 return NULL;
445         }
446
447         struct utcp_connection key = {
448                 .src = src,
449                 .dst = dst,
450         }, *keyp = &key;
451         struct utcp_connection **match = bsearch(&keyp, utcp->connections, utcp->nconnections, sizeof(*utcp->connections), compare);
452         return match ? *match : NULL;
453 }
454
455 static void free_connection(struct utcp_connection *c) {
456         struct utcp *utcp = c->utcp;
457         struct utcp_connection **cp = bsearch(&c, utcp->connections, utcp->nconnections, sizeof(*utcp->connections), compare);
458
459         assert(cp);
460
461         int i = cp - utcp->connections;
462         memmove(cp, cp + 1, (utcp->nconnections - i - 1) * sizeof(*cp));
463         utcp->nconnections--;
464
465         buffer_exit(&c->rcvbuf);
466         buffer_exit(&c->sndbuf);
467         free(c);
468 }
469
470 static struct utcp_connection *allocate_connection(struct utcp *utcp, uint16_t src, uint16_t dst) {
471         // Check whether this combination of src and dst is free
472
473         if(src) {
474                 if(find_connection(utcp, src, dst)) {
475                         errno = EADDRINUSE;
476                         return NULL;
477                 }
478         } else { // If src == 0, generate a random port number with the high bit set
479                 if(utcp->nconnections >= 32767) {
480                         errno = ENOMEM;
481                         return NULL;
482                 }
483
484                 src = rand() | 0x8000;
485
486                 while(find_connection(utcp, src, dst)) {
487                         src++;
488                 }
489         }
490
491         // Allocate memory for the new connection
492
493         if(utcp->nconnections >= utcp->nallocated) {
494                 if(!utcp->nallocated) {
495                         utcp->nallocated = 4;
496                 } else {
497                         utcp->nallocated *= 2;
498                 }
499
500                 struct utcp_connection **new_array = realloc(utcp->connections, utcp->nallocated * sizeof(*utcp->connections));
501
502                 if(!new_array) {
503                         return NULL;
504                 }
505
506                 utcp->connections = new_array;
507         }
508
509         struct utcp_connection *c = calloc(1, sizeof(*c));
510
511         if(!c) {
512                 return NULL;
513         }
514
515         if(!buffer_set_size(&c->sndbuf, DEFAULT_SNDBUFSIZE, DEFAULT_MAXSNDBUFSIZE)) {
516                 free(c);
517                 return NULL;
518         }
519
520         if(!buffer_set_size(&c->rcvbuf, DEFAULT_RCVBUFSIZE, DEFAULT_MAXRCVBUFSIZE)) {
521                 buffer_exit(&c->sndbuf);
522                 free(c);
523                 return NULL;
524         }
525
526         // Fill in the details
527
528         c->src = src;
529         c->dst = dst;
530 #ifdef UTCP_DEBUG
531         c->snd.iss = 0;
532 #else
533         c->snd.iss = rand();
534 #endif
535         c->snd.una = c->snd.iss;
536         c->snd.nxt = c->snd.iss + 1;
537         c->snd.last = c->snd.nxt;
538         c->snd.cwnd = (utcp->mss > 2190 ? 2 : utcp->mss > 1095 ? 3 : 4) * utcp->mss;
539         c->snd.ssthresh = ~0;
540         debug_cwnd(c);
541         c->srtt = 0;
542         c->rttvar = 0;
543         c->rto = START_RTO;
544         c->utcp = utcp;
545
546         // Add it to the sorted list of connections
547
548         utcp->connections[utcp->nconnections++] = c;
549         qsort(utcp->connections, utcp->nconnections, sizeof(*utcp->connections), compare);
550
551         return c;
552 }
553
554 static inline uint32_t absdiff(uint32_t a, uint32_t b) {
555         if(a > b) {
556                 return a - b;
557         } else {
558                 return b - a;
559         }
560 }
561
562 // Update RTT variables. See RFC 6298.
563 static void update_rtt(struct utcp_connection *c, uint32_t rtt) {
564         if(!rtt) {
565                 debug(c, "invalid rtt\n");
566                 return;
567         }
568
569         if(!c->srtt) {
570                 c->srtt = rtt;
571                 c->rttvar = rtt / 2;
572         } else {
573                 c->rttvar = (c->rttvar * 3 + absdiff(c->srtt, rtt)) / 4;
574                 c->srtt = (c->srtt * 7 + rtt) / 8;
575         }
576
577         c->rto = c->srtt + max(4 * c->rttvar, CLOCK_GRANULARITY);
578
579         if(c->rto > MAX_RTO) {
580                 c->rto = MAX_RTO;
581         }
582
583         debug(c, "rtt %u srtt %u rttvar %u rto %u\n", rtt, c->srtt, c->rttvar, c->rto);
584 }
585
586 static void start_retransmit_timer(struct utcp_connection *c) {
587         clock_gettime(UTCP_CLOCK, &c->rtrx_timeout);
588
589         uint32_t rto = c->rto;
590
591         while(rto > USEC_PER_SEC) {
592                 c->rtrx_timeout.tv_sec++;
593                 rto -= USEC_PER_SEC;
594         }
595
596         c->rtrx_timeout.tv_nsec += rto * 1000;
597
598         if(c->rtrx_timeout.tv_nsec >= NSEC_PER_SEC) {
599                 c->rtrx_timeout.tv_nsec -= NSEC_PER_SEC;
600                 c->rtrx_timeout.tv_sec++;
601         }
602
603         debug(c, "rtrx_timeout %ld.%06lu\n", c->rtrx_timeout.tv_sec, c->rtrx_timeout.tv_nsec);
604 }
605
606 static void start_flush_timer(struct utcp_connection *c) {
607         clock_gettime(UTCP_CLOCK, &c->rtrx_timeout);
608
609         c->rtrx_timeout.tv_nsec += c->utcp->flush_timeout * 1000000;
610
611         if(c->rtrx_timeout.tv_nsec >= NSEC_PER_SEC) {
612                 c->rtrx_timeout.tv_nsec -= NSEC_PER_SEC;
613                 c->rtrx_timeout.tv_sec++;
614         }
615
616         debug(c, "rtrx_timeout %ld.%06lu (flush)\n", c->rtrx_timeout.tv_sec, c->rtrx_timeout.tv_nsec);
617 }
618
619 static void stop_retransmit_timer(struct utcp_connection *c) {
620         timespec_clear(&c->rtrx_timeout);
621         debug(c, "rtrx_timeout cleared\n");
622 }
623
624 struct utcp_connection *utcp_connect_ex(struct utcp *utcp, uint16_t dst, utcp_recv_t recv, void *priv, uint32_t flags) {
625         struct utcp_connection *c = allocate_connection(utcp, 0, dst);
626
627         if(!c) {
628                 return NULL;
629         }
630
631         assert((flags & ~0x1f) == 0);
632
633         c->flags = flags;
634         c->recv = recv;
635         c->priv = priv;
636
637         struct {
638                 struct hdr hdr;
639                 uint8_t init[4];
640         } pkt;
641
642         pkt.hdr.src = c->src;
643         pkt.hdr.dst = c->dst;
644         pkt.hdr.seq = c->snd.iss;
645         pkt.hdr.ack = 0;
646         pkt.hdr.wnd = c->rcvbuf.maxsize;
647         pkt.hdr.ctl = SYN;
648         pkt.hdr.aux = 0x0101;
649         pkt.init[0] = 1;
650         pkt.init[1] = 0;
651         pkt.init[2] = 0;
652         pkt.init[3] = flags & 0x7;
653
654         set_state(c, SYN_SENT);
655
656         print_packet(c, "send", &pkt, sizeof(pkt));
657         utcp->send(utcp, &pkt, sizeof(pkt));
658
659         clock_gettime(UTCP_CLOCK, &c->conn_timeout);
660         c->conn_timeout.tv_sec += utcp->timeout;
661
662         start_retransmit_timer(c);
663
664         return c;
665 }
666
667 struct utcp_connection *utcp_connect(struct utcp *utcp, uint16_t dst, utcp_recv_t recv, void *priv) {
668         return utcp_connect_ex(utcp, dst, recv, priv, UTCP_TCP);
669 }
670
671 void utcp_accept(struct utcp_connection *c, utcp_recv_t recv, void *priv) {
672         if(c->reapable || c->state != SYN_RECEIVED) {
673                 debug(c, "accept() called on invalid connection in state %s\n", c, strstate[c->state]);
674                 return;
675         }
676
677         debug(c, "accepted %p %p\n", c, recv, priv);
678         c->recv = recv;
679         c->priv = priv;
680         c->do_poll = true;
681         set_state(c, ESTABLISHED);
682 }
683
684 static void ack(struct utcp_connection *c, bool sendatleastone) {
685         int32_t left = seqdiff(c->snd.last, c->snd.nxt);
686         int32_t cwndleft = is_reliable(c) ? min(c->snd.cwnd, c->snd.wnd) - seqdiff(c->snd.nxt, c->snd.una) : MAX_UNRELIABLE_SIZE;
687
688         assert(left >= 0);
689
690         if(cwndleft <= 0) {
691                 left = 0;
692         } else if(cwndleft < left) {
693                 left = cwndleft;
694
695                 if(!sendatleastone || cwndleft > c->utcp->mss) {
696                         left -= left % c->utcp->mss;
697                 }
698         }
699
700         debug(c, "cwndleft %d left %d\n", cwndleft, left);
701
702         if(!left && !sendatleastone) {
703                 return;
704         }
705
706         struct {
707                 struct hdr hdr;
708                 uint8_t data[];
709         } *pkt = c->utcp->pkt;
710
711         pkt->hdr.src = c->src;
712         pkt->hdr.dst = c->dst;
713         pkt->hdr.ack = c->rcv.nxt;
714         pkt->hdr.wnd = is_reliable(c) ? c->rcvbuf.maxsize : 0;
715         pkt->hdr.ctl = ACK;
716         pkt->hdr.aux = 0;
717
718         do {
719                 uint32_t seglen = left > c->utcp->mss ? c->utcp->mss : left;
720                 pkt->hdr.seq = c->snd.nxt;
721
722                 buffer_copy(&c->sndbuf, pkt->data, seqdiff(c->snd.nxt, c->snd.una), seglen);
723
724                 c->snd.nxt += seglen;
725                 left -= seglen;
726
727                 if(!is_reliable(c)) {
728                         if(left) {
729                                 pkt->hdr.ctl |= MF;
730                         } else {
731                                 pkt->hdr.ctl &= ~MF;
732                         }
733                 }
734
735                 if(seglen && fin_wanted(c, c->snd.nxt)) {
736                         seglen--;
737                         pkt->hdr.ctl |= FIN;
738                 }
739
740                 if(!c->rtt_start.tv_sec && is_reliable(c)) {
741                         // Start RTT measurement
742                         clock_gettime(UTCP_CLOCK, &c->rtt_start);
743                         c->rtt_seq = pkt->hdr.seq + seglen;
744                         debug(c, "starting RTT measurement, expecting ack %u\n", c->rtt_seq);
745                 }
746
747                 print_packet(c, "send", pkt, sizeof(pkt->hdr) + seglen);
748                 c->utcp->send(c->utcp, pkt, sizeof(pkt->hdr) + seglen);
749
750                 if(left && !is_reliable(c)) {
751                         pkt->hdr.wnd += seglen;
752                 }
753         } while(left);
754 }
755
756 static ssize_t utcp_send_reliable(struct utcp_connection *c, const void *data, size_t len) {
757         size_t rlen = len + (is_framed(c) ? 2 : 0);
758
759         if(!rlen) {
760                 return 0;
761         }
762
763         // Check if we need to be able to buffer all data
764
765         if(c->flags & (UTCP_NO_PARTIAL | UTCP_FRAMED)) {
766                 if(rlen > c->sndbuf.maxsize) {
767                         errno = EMSGSIZE;
768                         return -1;
769                 }
770
771                 if((c->flags & UTCP_FRAMED) && len > MAX_UNRELIABLE_SIZE) {
772                         errno = EMSGSIZE;
773                         return -1;
774                 }
775
776                 if(rlen > buffer_free(&c->sndbuf)) {
777                         errno = EWOULDBLOCK;
778                         return 0;
779                 }
780         }
781
782         // Add data to the send buffer.
783
784         if(is_framed(c)) {
785                 uint16_t len16 = len;
786                 buffer_put(&c->sndbuf, &len16, sizeof(len16));
787                 assert(buffer_put(&c->sndbuf, data, len) == (ssize_t)len);
788         } else {
789                 len = buffer_put(&c->sndbuf, data, len);
790
791                 if(len <= 0) {
792                         errno = EWOULDBLOCK;
793                         return 0;
794                 }
795         }
796
797         c->snd.last += rlen;
798
799         // Don't send anything yet if the connection has not fully established yet
800
801         if(c->state == SYN_SENT || c->state == SYN_RECEIVED) {
802                 return len;
803         }
804
805         ack(c, false);
806
807         if(!timespec_isset(&c->rtrx_timeout)) {
808                 start_retransmit_timer(c);
809         }
810
811         if(!timespec_isset(&c->conn_timeout)) {
812                 clock_gettime(UTCP_CLOCK, &c->conn_timeout);
813                 c->conn_timeout.tv_sec += c->utcp->timeout;
814         }
815
816         return len;
817 }
818
819
820 /* In the send buffer we can have multiple frames, each prefixed with their
821    length. However, the first frame might already have been partially sent. The
822    variable c->frame_offset tracks how much of a partial frame is left at the
823    start. If it is 0, it means there is no partial frame, and the first two
824    bytes in the send buffer are the length of the first frame.
825
826    After sending an MSS sized packet, we need to calculate the new frame_offset
827    value, since it is likely that the next packet will also have a partial frame
828    at the start. We do this by scanning the previously sent packet for frame
829    headers, to find out how many bytes of the last frame are left to send.
830 */
831 static void ack_unreliable_framed(struct utcp_connection *c) {
832         int32_t left = seqdiff(c->snd.last, c->snd.nxt);
833         assert(left > 0);
834
835         struct {
836                 struct hdr hdr;
837                 uint8_t data[];
838         } *pkt = c->utcp->pkt;
839
840         pkt->hdr.src = c->src;
841         pkt->hdr.dst = c->dst;
842         pkt->hdr.ack = c->rcv.nxt;
843         pkt->hdr.ctl = ACK | MF;
844         pkt->hdr.aux = 0;
845
846         bool sent_packet = false;
847
848         while(left >= c->utcp->mss) {
849                 pkt->hdr.wnd = c->frame_offset;
850                 uint32_t seglen = c->utcp->mss;
851
852                 pkt->hdr.seq = c->snd.nxt;
853
854                 buffer_copy(&c->sndbuf, pkt->data, seqdiff(c->snd.nxt, c->snd.una), seglen);
855
856                 c->snd.nxt += seglen;
857                 c->snd.una = c->snd.nxt;
858                 left -= seglen;
859
860                 print_packet(c, "send", pkt, sizeof(pkt->hdr) + seglen);
861                 c->utcp->send(c->utcp, pkt, sizeof(pkt->hdr) + seglen);
862                 sent_packet = true;
863
864                 // Calculate the new frame offset
865                 while(c->frame_offset < seglen) {
866                         uint16_t framelen;
867                         buffer_copy(&c->sndbuf, &framelen, c->frame_offset, sizeof(framelen));
868                         c->frame_offset += framelen + 2;
869                 }
870
871                 buffer_discard(&c->sndbuf, seglen);
872                 c->frame_offset -= seglen;
873         };
874
875         if(sent_packet) {
876                 if(left) {
877                         // We sent one packet but we have partial data left, (re)start the flush timer
878                         if(!timespec_isset(&c->rtrx_timeout)) {
879                                 c->flush_needed = true;
880                         }
881
882                         start_flush_timer(c);
883                 } else {
884                         // There is no partial data in the send buffer, so stop the flush timer
885                         stop_retransmit_timer(c);
886                 }
887         } else if(left && !timespec_isset(&c->rtrx_timeout)) {
888                 // We have partial data and we didn't start the flush timer yet
889                 c->flush_needed = true;
890                 start_flush_timer(c);
891         }
892 }
893
894 static void flush_unreliable_framed(struct utcp_connection *c) {
895         int32_t left = seqdiff(c->snd.last, c->snd.nxt);
896
897         /* If the MSS dropped since last time ack_unreliable_frame() was called,
898           we might now have more than one segment worth of data left.
899         */
900         if(left > c->utcp->mss) {
901                 ack_unreliable_framed(c);
902                 left = seqdiff(c->snd.last, c->snd.nxt);
903                 assert(left <= c->utcp->mss);
904         }
905
906         if(left) {
907                 struct {
908                         struct hdr hdr;
909                         uint8_t data[];
910                 } *pkt = c->utcp->pkt;
911
912                 pkt->hdr.src = c->src;
913                 pkt->hdr.dst = c->dst;
914                 pkt->hdr.seq = c->snd.nxt;
915                 pkt->hdr.ack = c->rcv.nxt;
916                 pkt->hdr.wnd = c->frame_offset;
917                 pkt->hdr.ctl = ACK | MF;
918                 pkt->hdr.aux = 0;
919
920                 uint32_t seglen = left;
921
922                 buffer_copy(&c->sndbuf, pkt->data, seqdiff(c->snd.nxt, c->snd.una), seglen);
923                 buffer_discard(&c->sndbuf, seglen);
924
925                 c->snd.nxt += seglen;
926                 c->snd.una = c->snd.nxt;
927
928                 print_packet(c, "send", pkt, sizeof(pkt->hdr) + seglen);
929                 c->utcp->send(c->utcp, pkt, sizeof(pkt->hdr) + seglen);
930         }
931
932         c->frame_offset = 0;
933         stop_retransmit_timer(c);
934 }
935
936
937 static ssize_t utcp_send_unreliable(struct utcp_connection *c, const void *data, size_t len) {
938         if(len > MAX_UNRELIABLE_SIZE) {
939                 errno = EMSGSIZE;
940                 return -1;
941         }
942
943         size_t rlen = len + (is_framed(c) ? 2 : 0);
944
945         if(rlen > buffer_free(&c->sndbuf)) {
946                 if(rlen > c->sndbuf.maxsize) {
947                         errno = EMSGSIZE;
948                         return -1;
949                 } else {
950                         errno = EWOULDBLOCK;
951                         return 0;
952                 }
953         }
954
955         // Don't send anything yet if the connection has not fully established yet
956
957         if(c->state == SYN_SENT || c->state == SYN_RECEIVED) {
958                 return len;
959         }
960
961         if(is_framed(c)) {
962                 uint16_t framelen = len;
963                 buffer_put(&c->sndbuf, &framelen, sizeof(framelen));
964         }
965
966         buffer_put(&c->sndbuf, data, len);
967
968         c->snd.last += rlen;
969
970         if(is_framed(c)) {
971                 ack_unreliable_framed(c);
972         } else {
973                 ack(c, false);
974                 c->snd.una = c->snd.nxt = c->snd.last;
975                 buffer_discard(&c->sndbuf, c->sndbuf.used);
976         }
977
978         return len;
979 }
980
981 ssize_t utcp_send(struct utcp_connection *c, const void *data, size_t len) {
982         if(c->reapable) {
983                 debug(c, "send() called on closed connection\n");
984                 errno = EBADF;
985                 return -1;
986         }
987
988         switch(c->state) {
989         case CLOSED:
990         case LISTEN:
991                 debug(c, "send() called on unconnected connection\n");
992                 errno = ENOTCONN;
993                 return -1;
994
995         case SYN_SENT:
996         case SYN_RECEIVED:
997         case ESTABLISHED:
998         case CLOSE_WAIT:
999                 break;
1000
1001         case FIN_WAIT_1:
1002         case FIN_WAIT_2:
1003         case CLOSING:
1004         case LAST_ACK:
1005         case TIME_WAIT:
1006                 debug(c, "send() called on closed connection\n");
1007                 errno = EPIPE;
1008                 return -1;
1009         }
1010
1011         if(!data && len) {
1012                 errno = EFAULT;
1013                 return -1;
1014         }
1015
1016         if(is_reliable(c)) {
1017                 return utcp_send_reliable(c, data, len);
1018         } else {
1019                 return utcp_send_unreliable(c, data, len);
1020         }
1021 }
1022
1023 static void swap_ports(struct hdr *hdr) {
1024         uint16_t tmp = hdr->src;
1025         hdr->src = hdr->dst;
1026         hdr->dst = tmp;
1027 }
1028
1029 static void fast_retransmit(struct utcp_connection *c) {
1030         if(c->state == CLOSED || c->snd.last == c->snd.una) {
1031                 debug(c, "fast_retransmit() called but nothing to retransmit!\n");
1032                 return;
1033         }
1034
1035         struct utcp *utcp = c->utcp;
1036
1037         struct {
1038                 struct hdr hdr;
1039                 uint8_t data[];
1040         } *pkt = c->utcp->pkt;
1041
1042         pkt->hdr.src = c->src;
1043         pkt->hdr.dst = c->dst;
1044         pkt->hdr.wnd = c->rcvbuf.maxsize;
1045         pkt->hdr.aux = 0;
1046
1047         switch(c->state) {
1048         case ESTABLISHED:
1049         case FIN_WAIT_1:
1050         case CLOSE_WAIT:
1051         case CLOSING:
1052         case LAST_ACK:
1053                 // Send unacked data again.
1054                 pkt->hdr.seq = c->snd.una;
1055                 pkt->hdr.ack = c->rcv.nxt;
1056                 pkt->hdr.ctl = ACK;
1057                 uint32_t len = min(seqdiff(c->snd.last, c->snd.una), utcp->mss);
1058
1059                 if(fin_wanted(c, c->snd.una + len)) {
1060                         len--;
1061                         pkt->hdr.ctl |= FIN;
1062                 }
1063
1064                 buffer_copy(&c->sndbuf, pkt->data, 0, len);
1065                 print_packet(c, "rtrx", pkt, sizeof(pkt->hdr) + len);
1066                 utcp->send(utcp, pkt, sizeof(pkt->hdr) + len);
1067                 break;
1068
1069         default:
1070                 break;
1071         }
1072 }
1073
1074 static void retransmit(struct utcp_connection *c) {
1075         if(c->state == CLOSED || c->snd.last == c->snd.una) {
1076                 debug(c, "retransmit() called but nothing to retransmit!\n");
1077                 stop_retransmit_timer(c);
1078                 return;
1079         }
1080
1081         struct utcp *utcp = c->utcp;
1082
1083         if(utcp->retransmit && is_reliable(c)) {
1084                 utcp->retransmit(c);
1085         }
1086
1087         struct {
1088                 struct hdr hdr;
1089                 uint8_t data[];
1090         } *pkt = c->utcp->pkt;
1091
1092         pkt->hdr.src = c->src;
1093         pkt->hdr.dst = c->dst;
1094         pkt->hdr.wnd = c->rcvbuf.maxsize;
1095         pkt->hdr.aux = 0;
1096
1097         switch(c->state) {
1098         case SYN_SENT:
1099                 // Send our SYN again
1100                 pkt->hdr.seq = c->snd.iss;
1101                 pkt->hdr.ack = 0;
1102                 pkt->hdr.ctl = SYN;
1103                 pkt->hdr.aux = 0x0101;
1104                 pkt->data[0] = 1;
1105                 pkt->data[1] = 0;
1106                 pkt->data[2] = 0;
1107                 pkt->data[3] = c->flags & 0x7;
1108                 print_packet(c, "rtrx", pkt, sizeof(pkt->hdr) + 4);
1109                 utcp->send(utcp, pkt, sizeof(pkt->hdr) + 4);
1110                 break;
1111
1112         case SYN_RECEIVED:
1113                 // Send SYNACK again
1114                 pkt->hdr.seq = c->snd.nxt;
1115                 pkt->hdr.ack = c->rcv.nxt;
1116                 pkt->hdr.ctl = SYN | ACK;
1117                 print_packet(c, "rtrx", pkt, sizeof(pkt->hdr));
1118                 utcp->send(utcp, pkt, sizeof(pkt->hdr));
1119                 break;
1120
1121         case ESTABLISHED:
1122         case FIN_WAIT_1:
1123         case CLOSE_WAIT:
1124         case CLOSING:
1125         case LAST_ACK:
1126                 if(!is_reliable(c) && is_framed(c) && c->sndbuf.used) {
1127                         flush_unreliable_framed(c);
1128                         return;
1129                 }
1130
1131                 // Send unacked data again.
1132                 pkt->hdr.seq = c->snd.una;
1133                 pkt->hdr.ack = c->rcv.nxt;
1134                 pkt->hdr.ctl = ACK;
1135                 uint32_t len = min(seqdiff(c->snd.last, c->snd.una), utcp->mss);
1136
1137                 if(fin_wanted(c, c->snd.una + len)) {
1138                         len--;
1139                         pkt->hdr.ctl |= FIN;
1140                 }
1141
1142                 // RFC 5681 slow start after timeout
1143                 uint32_t flightsize = seqdiff(c->snd.nxt, c->snd.una);
1144                 c->snd.ssthresh = max(flightsize / 2, utcp->mss * 2); // eq. 4
1145                 c->snd.cwnd = utcp->mss;
1146                 debug_cwnd(c);
1147
1148                 buffer_copy(&c->sndbuf, pkt->data, 0, len);
1149                 print_packet(c, "rtrx", pkt, sizeof(pkt->hdr) + len);
1150                 utcp->send(utcp, pkt, sizeof(pkt->hdr) + len);
1151
1152                 c->snd.nxt = c->snd.una + len;
1153                 break;
1154
1155         case CLOSED:
1156         case LISTEN:
1157         case TIME_WAIT:
1158         case FIN_WAIT_2:
1159                 // We shouldn't need to retransmit anything in this state.
1160 #ifdef UTCP_DEBUG
1161                 abort();
1162 #endif
1163                 stop_retransmit_timer(c);
1164                 goto cleanup;
1165         }
1166
1167         start_retransmit_timer(c);
1168         c->rto *= 2;
1169
1170         if(c->rto > MAX_RTO) {
1171                 c->rto = MAX_RTO;
1172         }
1173
1174         c->rtt_start.tv_sec = 0; // invalidate RTT timer
1175         c->dupack = 0; // cancel any ongoing fast recovery
1176
1177 cleanup:
1178         return;
1179 }
1180
1181 /* Update receive buffer and SACK entries after consuming data.
1182  *
1183  * Situation:
1184  *
1185  * |.....0000..1111111111.....22222......3333|
1186  * |---------------^
1187  *
1188  * 0..3 represent the SACK entries. The ^ indicates up to which point we want
1189  * to remove data from the receive buffer. The idea is to substract "len"
1190  * from the offset of all the SACK entries, and then remove/cut down entries
1191  * that are shifted to before the start of the receive buffer.
1192  *
1193  * There are three cases:
1194  * - the SACK entry is after ^, in that case just change the offset.
1195  * - the SACK entry starts before and ends after ^, so we have to
1196  *   change both its offset and size.
1197  * - the SACK entry is completely before ^, in that case delete it.
1198  */
1199 static void sack_consume(struct utcp_connection *c, size_t len) {
1200         debug(c, "sack_consume %lu\n", (unsigned long)len);
1201
1202         if(len > c->rcvbuf.used) {
1203                 debug(c, "all SACK entries consumed\n");
1204                 c->sacks[0].len = 0;
1205                 return;
1206         }
1207
1208         buffer_discard(&c->rcvbuf, len);
1209
1210         for(int i = 0; i < NSACKS && c->sacks[i].len;) {
1211                 if(len < c->sacks[i].offset) {
1212                         c->sacks[i].offset -= len;
1213                         i++;
1214                 } else if(len < c->sacks[i].offset + c->sacks[i].len) {
1215                         c->sacks[i].len -= len - c->sacks[i].offset;
1216                         c->sacks[i].offset = 0;
1217                         i++;
1218                 } else {
1219                         if(i < NSACKS - 1) {
1220                                 memmove(&c->sacks[i], &c->sacks[i + 1], (NSACKS - 1 - i) * sizeof(c->sacks)[i]);
1221                                 c->sacks[NSACKS - 1].len = 0;
1222                         } else {
1223                                 c->sacks[i].len = 0;
1224                                 break;
1225                         }
1226                 }
1227         }
1228
1229         for(int i = 0; i < NSACKS && c->sacks[i].len; i++) {
1230                 debug(c, "SACK[%d] offset %u len %u\n", i, c->sacks[i].offset, c->sacks[i].len);
1231         }
1232 }
1233
1234 static void handle_out_of_order(struct utcp_connection *c, uint32_t offset, const void *data, size_t len) {
1235         debug(c, "out of order packet, offset %u\n", offset);
1236         // Packet loss or reordering occured. Store the data in the buffer.
1237         ssize_t rxd = buffer_put_at(&c->rcvbuf, offset, data, len);
1238
1239         if(rxd <= 0) {
1240                 debug(c, "packet outside receive buffer, dropping\n");
1241                 return;
1242         }
1243
1244         if((size_t)rxd < len) {
1245                 debug(c, "packet partially outside receive buffer\n");
1246                 len = rxd;
1247         }
1248
1249         // Make note of where we put it.
1250         for(int i = 0; i < NSACKS; i++) {
1251                 if(!c->sacks[i].len) { // nothing to merge, add new entry
1252                         debug(c, "new SACK entry %d\n", i);
1253                         c->sacks[i].offset = offset;
1254                         c->sacks[i].len = rxd;
1255                         break;
1256                 } else if(offset < c->sacks[i].offset) {
1257                         if(offset + rxd < c->sacks[i].offset) { // insert before
1258                                 if(!c->sacks[NSACKS - 1].len) { // only if room left
1259                                         debug(c, "insert SACK entry at %d\n", i);
1260                                         memmove(&c->sacks[i + 1], &c->sacks[i], (NSACKS - i - 1) * sizeof(c->sacks)[i]);
1261                                         c->sacks[i].offset = offset;
1262                                         c->sacks[i].len = rxd;
1263                                 } else {
1264                                         debug(c, "SACK entries full, dropping packet\n");
1265                                 }
1266
1267                                 break;
1268                         } else { // merge
1269                                 debug(c, "merge with start of SACK entry at %d\n", i);
1270                                 c->sacks[i].offset = offset;
1271                                 break;
1272                         }
1273                 } else if(offset <= c->sacks[i].offset + c->sacks[i].len) {
1274                         if(offset + rxd > c->sacks[i].offset + c->sacks[i].len) { // merge
1275                                 debug(c, "merge with end of SACK entry at %d\n", i);
1276                                 c->sacks[i].len = offset + rxd - c->sacks[i].offset;
1277                                 // TODO: handle potential merge with next entry
1278                         }
1279
1280                         break;
1281                 }
1282         }
1283
1284         for(int i = 0; i < NSACKS && c->sacks[i].len; i++) {
1285                 debug(c, "SACK[%d] offset %u len %u\n", i, c->sacks[i].offset, c->sacks[i].len);
1286         }
1287 }
1288
1289 static void handle_out_of_order_framed(struct utcp_connection *c, uint32_t offset, const void *data, size_t len) {
1290         uint32_t in_order_offset = (c->sacks[0].len && c->sacks[0].offset == 0) ? c->sacks[0].len : 0;
1291
1292         // Put the data into the receive buffer
1293         handle_out_of_order(c, offset + in_order_offset, data, len);
1294 }
1295
1296 static void handle_in_order(struct utcp_connection *c, const void *data, size_t len) {
1297         if(c->recv) {
1298                 ssize_t rxd = c->recv(c, data, len);
1299
1300                 if(rxd != (ssize_t)len) {
1301                         // TODO: handle the application not accepting all data.
1302                         abort();
1303                 }
1304         }
1305
1306         // Check if we can process out-of-order data now.
1307         if(c->sacks[0].len && len >= c->sacks[0].offset) {
1308                 debug(c, "incoming packet len %lu connected with SACK at %u\n", (unsigned long)len, c->sacks[0].offset);
1309
1310                 if(len < c->sacks[0].offset + c->sacks[0].len) {
1311                         size_t offset = len;
1312                         len = c->sacks[0].offset + c->sacks[0].len;
1313                         size_t remainder = len - offset;
1314
1315                         ssize_t rxd = buffer_call(c, &c->rcvbuf, offset, remainder);
1316
1317                         if(rxd != (ssize_t)remainder) {
1318                                 // TODO: handle the application not accepting all data.
1319                                 abort();
1320                         }
1321                 }
1322         }
1323
1324         if(c->rcvbuf.used) {
1325                 sack_consume(c, len);
1326         }
1327
1328         c->rcv.nxt += len;
1329 }
1330
1331 static void handle_in_order_framed(struct utcp_connection *c, const void *data, size_t len) {
1332         // Treat it as out of order, since it is unlikely the start of this packet contains the start of a frame.
1333         uint32_t in_order_offset = (c->sacks[0].len && c->sacks[0].offset == 0) ? c->sacks[0].len : 0;
1334         handle_out_of_order(c, in_order_offset, data, len);
1335
1336         // While we have full frames at the start, give them to the application
1337         while(c->sacks[0].len >= 2 && c->sacks[0].offset == 0) {
1338                 uint16_t framelen;
1339                 buffer_copy(&c->rcvbuf, &framelen, 0, sizeof(&framelen));
1340
1341                 if(framelen > c->sacks[0].len - 2) {
1342                         break;
1343                 }
1344
1345                 if(c->recv) {
1346                         ssize_t rxd;
1347                         uint32_t realoffset = c->rcvbuf.offset + 2;
1348
1349                         if(c->rcvbuf.size - c->rcvbuf.offset <= 2) {
1350                                 realoffset -= c->rcvbuf.size;
1351                         }
1352
1353                         if(realoffset > c->rcvbuf.size - framelen) {
1354                                 // The buffer wraps, we need to copy
1355                                 uint8_t buf[framelen];
1356                                 buffer_copy(&c->rcvbuf, buf, 2, framelen);
1357                                 rxd = c->recv(c, buf, framelen);
1358                         } else {
1359                                 // The frame is contiguous in the receive buffer
1360                                 rxd = c->recv(c, c->rcvbuf.data + realoffset, framelen);
1361                         }
1362
1363                         if(rxd != (ssize_t)framelen) {
1364                                 // TODO: handle the application not accepting all data.
1365                                 abort();
1366                         }
1367                 }
1368
1369                 sack_consume(c, framelen + 2);
1370         }
1371
1372         c->rcv.nxt += len;
1373 }
1374
1375 static void handle_unreliable(struct utcp_connection *c, const struct hdr *hdr, const void *data, size_t len) {
1376         // Fast path for unfragmented packets
1377         if(!hdr->wnd && !(hdr->ctl & MF)) {
1378                 if(c->recv) {
1379                         c->recv(c, data, len);
1380                 }
1381
1382                 c->rcv.nxt = hdr->seq + len;
1383                 return;
1384         }
1385
1386         // Ensure reassembled packet are not larger than 64 kiB
1387         if(hdr->wnd > MAX_UNRELIABLE_SIZE || hdr->wnd + len > MAX_UNRELIABLE_SIZE) {
1388                 return;
1389         }
1390
1391         // Don't accept out of order fragments
1392         if(hdr->wnd && hdr->seq != c->rcv.nxt) {
1393                 return;
1394         }
1395
1396         // Reset the receive buffer for the first fragment
1397         if(!hdr->wnd) {
1398                 buffer_clear(&c->rcvbuf);
1399         }
1400
1401         ssize_t rxd = buffer_put_at(&c->rcvbuf, hdr->wnd, data, len);
1402
1403         if(rxd != (ssize_t)len) {
1404                 return;
1405         }
1406
1407         // Send the packet if it's the final fragment
1408         if(!(hdr->ctl & MF)) {
1409                 buffer_call(c, &c->rcvbuf, 0, hdr->wnd + len);
1410         }
1411
1412         c->rcv.nxt = hdr->seq + len;
1413 }
1414
1415 static void handle_unreliable_framed(struct utcp_connection *c, const struct hdr *hdr, const void *data, size_t len) {
1416         bool in_order = hdr->seq == c->rcv.nxt;
1417         c->rcv.nxt = hdr->seq + len;
1418
1419         const uint8_t *ptr = data;
1420         size_t left = len;
1421
1422         // Does it start with a partial frame?
1423         if(hdr->wnd) {
1424                 // Only accept the data if it is in order
1425                 if(in_order && c->rcvbuf.used) {
1426                         // In order, append it to the receive buffer
1427                         buffer_put(&c->rcvbuf, data, min(hdr->wnd, len));
1428
1429                         if(hdr->wnd <= len) {
1430                                 // We have a full frame
1431                                 c->recv(c, (uint8_t *)c->rcvbuf.data + 2, c->rcvbuf.used - 2);
1432                         }
1433                 }
1434
1435                 // Exit early if there is other data in this frame
1436                 if(hdr->wnd > len) {
1437                         if(!in_order) {
1438                                 buffer_clear(&c->rcvbuf);
1439                         }
1440
1441                         return;
1442                 }
1443
1444                 ptr += hdr->wnd;
1445                 left -= hdr->wnd;
1446         }
1447
1448         // We now start with new frames, so clear any data in the receive buffer
1449         buffer_clear(&c->rcvbuf);
1450
1451         // Handle whole frames
1452         while(left >= 2) {
1453                 uint16_t framelen;
1454                 memcpy(&framelen, ptr, sizeof(framelen));
1455
1456                 if(left < (size_t)framelen + 2) {
1457                         break;
1458                 }
1459
1460                 c->recv(c, ptr + 2, framelen);
1461                 ptr += framelen + 2;
1462                 left -= framelen + 2;
1463         }
1464
1465         // Handle partial last frame
1466         if(left) {
1467                 buffer_put(&c->rcvbuf, ptr, left);
1468         }
1469 }
1470
1471 static void handle_incoming_data(struct utcp_connection *c, const struct hdr *hdr, const void *data, size_t len) {
1472         if(!is_reliable(c)) {
1473                 if(is_framed(c)) {
1474                         handle_unreliable_framed(c, hdr, data, len);
1475                 } else {
1476                         handle_unreliable(c, hdr, data, len);
1477                 }
1478
1479                 return;
1480         }
1481
1482         uint32_t offset = seqdiff(hdr->seq, c->rcv.nxt);
1483
1484         if(is_framed(c)) {
1485                 if(offset) {
1486                         handle_out_of_order_framed(c, offset, data, len);
1487                 } else {
1488                         handle_in_order_framed(c, data, len);
1489                 }
1490         } else {
1491                 if(offset) {
1492                         handle_out_of_order(c, offset, data, len);
1493                 } else {
1494                         handle_in_order(c, data, len);
1495                 }
1496         }
1497 }
1498
1499
1500 ssize_t utcp_recv(struct utcp *utcp, const void *data, size_t len) {
1501         const uint8_t *ptr = data;
1502
1503         if(!utcp) {
1504                 errno = EFAULT;
1505                 return -1;
1506         }
1507
1508         if(!len) {
1509                 return 0;
1510         }
1511
1512         if(!data) {
1513                 errno = EFAULT;
1514                 return -1;
1515         }
1516
1517         // Drop packets smaller than the header
1518
1519         struct hdr hdr;
1520
1521         if(len < sizeof(hdr)) {
1522                 print_packet(NULL, "recv", data, len);
1523                 errno = EBADMSG;
1524                 return -1;
1525         }
1526
1527         // Make a copy from the potentially unaligned data to a struct hdr
1528
1529         memcpy(&hdr, ptr, sizeof(hdr));
1530
1531         // Try to match the packet to an existing connection
1532
1533         struct utcp_connection *c = find_connection(utcp, hdr.dst, hdr.src);
1534         print_packet(c, "recv", data, len);
1535
1536         // Process the header
1537
1538         ptr += sizeof(hdr);
1539         len -= sizeof(hdr);
1540
1541         // Drop packets with an unknown CTL flag
1542
1543         if(hdr.ctl & ~(SYN | ACK | RST | FIN | MF)) {
1544                 print_packet(NULL, "recv", data, len);
1545                 errno = EBADMSG;
1546                 return -1;
1547         }
1548
1549         // Check for auxiliary headers
1550
1551         const uint8_t *init = NULL;
1552
1553         uint16_t aux = hdr.aux;
1554
1555         while(aux) {
1556                 size_t auxlen = 4 * (aux >> 8) & 0xf;
1557                 uint8_t auxtype = aux & 0xff;
1558
1559                 if(len < auxlen) {
1560                         errno = EBADMSG;
1561                         return -1;
1562                 }
1563
1564                 switch(auxtype) {
1565                 case AUX_INIT:
1566                         if(!(hdr.ctl & SYN) || auxlen != 4) {
1567                                 errno = EBADMSG;
1568                                 return -1;
1569                         }
1570
1571                         init = ptr;
1572                         break;
1573
1574                 default:
1575                         errno = EBADMSG;
1576                         return -1;
1577                 }
1578
1579                 len -= auxlen;
1580                 ptr += auxlen;
1581
1582                 if(!(aux & 0x800)) {
1583                         break;
1584                 }
1585
1586                 if(len < 2) {
1587                         errno = EBADMSG;
1588                         return -1;
1589                 }
1590
1591                 memcpy(&aux, ptr, 2);
1592                 len -= 2;
1593                 ptr += 2;
1594         }
1595
1596         bool has_data = len || (hdr.ctl & (SYN | FIN));
1597
1598         // Is it for a new connection?
1599
1600         if(!c) {
1601                 // Ignore RST packets
1602
1603                 if(hdr.ctl & RST) {
1604                         return 0;
1605                 }
1606
1607                 // Is it a SYN packet and are we LISTENing?
1608
1609                 if(hdr.ctl & SYN && !(hdr.ctl & ACK) && utcp->accept) {
1610                         // If we don't want to accept it, send a RST back
1611                         if((utcp->pre_accept && !utcp->pre_accept(utcp, hdr.dst))) {
1612                                 len = 1;
1613                                 goto reset;
1614                         }
1615
1616                         // Try to allocate memory, otherwise send a RST back
1617                         c = allocate_connection(utcp, hdr.dst, hdr.src);
1618
1619                         if(!c) {
1620                                 len = 1;
1621                                 goto reset;
1622                         }
1623
1624                         // Parse auxilliary information
1625                         if(init) {
1626                                 if(init[0] < 1) {
1627                                         len = 1;
1628                                         goto reset;
1629                                 }
1630
1631                                 c->flags = init[3] & 0x7;
1632                         } else {
1633                                 c->flags = UTCP_TCP;
1634                         }
1635
1636 synack:
1637                         // Return SYN+ACK, go to SYN_RECEIVED state
1638                         c->snd.wnd = hdr.wnd;
1639                         c->rcv.irs = hdr.seq;
1640                         c->rcv.nxt = c->rcv.irs + 1;
1641                         set_state(c, SYN_RECEIVED);
1642
1643                         struct {
1644                                 struct hdr hdr;
1645                                 uint8_t data[4];
1646                         } pkt;
1647
1648                         pkt.hdr.src = c->src;
1649                         pkt.hdr.dst = c->dst;
1650                         pkt.hdr.ack = c->rcv.irs + 1;
1651                         pkt.hdr.seq = c->snd.iss;
1652                         pkt.hdr.wnd = c->rcvbuf.maxsize;
1653                         pkt.hdr.ctl = SYN | ACK;
1654
1655                         if(init) {
1656                                 pkt.hdr.aux = 0x0101;
1657                                 pkt.data[0] = 1;
1658                                 pkt.data[1] = 0;
1659                                 pkt.data[2] = 0;
1660                                 pkt.data[3] = c->flags & 0x7;
1661                                 print_packet(c, "send", &pkt, sizeof(hdr) + 4);
1662                                 utcp->send(utcp, &pkt, sizeof(hdr) + 4);
1663                         } else {
1664                                 pkt.hdr.aux = 0;
1665                                 print_packet(c, "send", &pkt, sizeof(hdr));
1666                                 utcp->send(utcp, &pkt, sizeof(hdr));
1667                         }
1668
1669                         start_retransmit_timer(c);
1670                 } else {
1671                         // No, we don't want your packets, send a RST back
1672                         len = 1;
1673                         goto reset;
1674                 }
1675
1676                 return 0;
1677         }
1678
1679         debug(c, "state %s\n", strstate[c->state]);
1680
1681         // In case this is for a CLOSED connection, ignore the packet.
1682         // TODO: make it so incoming packets can never match a CLOSED connection.
1683
1684         if(c->state == CLOSED) {
1685                 debug(c, "got packet for closed connection\n");
1686                 return 0;
1687         }
1688
1689         // It is for an existing connection.
1690
1691         // 1. Drop invalid packets.
1692
1693         // 1a. Drop packets that should not happen in our current state.
1694
1695         switch(c->state) {
1696         case SYN_SENT:
1697         case SYN_RECEIVED:
1698         case ESTABLISHED:
1699         case FIN_WAIT_1:
1700         case FIN_WAIT_2:
1701         case CLOSE_WAIT:
1702         case CLOSING:
1703         case LAST_ACK:
1704         case TIME_WAIT:
1705                 break;
1706
1707         default:
1708 #ifdef UTCP_DEBUG
1709                 abort();
1710 #endif
1711                 break;
1712         }
1713
1714         // 1b. Discard data that is not in our receive window.
1715
1716         if(is_reliable(c)) {
1717                 bool acceptable;
1718
1719                 if(c->state == SYN_SENT) {
1720                         acceptable = true;
1721                 } else if(len == 0) {
1722                         acceptable = seqdiff(hdr.seq, c->rcv.nxt) >= 0;
1723                 } else {
1724                         int32_t rcv_offset = seqdiff(hdr.seq, c->rcv.nxt);
1725
1726                         // cut already accepted front overlapping
1727                         if(rcv_offset < 0) {
1728                                 acceptable = len > (size_t) - rcv_offset;
1729
1730                                 if(acceptable) {
1731                                         ptr -= rcv_offset;
1732                                         len += rcv_offset;
1733                                         hdr.seq -= rcv_offset;
1734                                 }
1735                         } else {
1736                                 acceptable = seqdiff(hdr.seq, c->rcv.nxt) >= 0 && seqdiff(hdr.seq, c->rcv.nxt) + len <= c->rcvbuf.maxsize;
1737                         }
1738                 }
1739
1740                 if(!acceptable) {
1741                         debug(c, "packet not acceptable, %u <= %u + %lu < %u\n", c->rcv.nxt, hdr.seq, (unsigned long)len, c->rcv.nxt + c->rcvbuf.maxsize);
1742
1743                         // Ignore unacceptable RST packets.
1744                         if(hdr.ctl & RST) {
1745                                 return 0;
1746                         }
1747
1748                         // Otherwise, continue processing.
1749                         len = 0;
1750                 }
1751         } else {
1752 #if UTCP_DEBUG
1753                 int32_t rcv_offset = seqdiff(hdr.seq, c->rcv.nxt);
1754
1755                 if(rcv_offset) {
1756                         debug(c, "packet out of order, offset %u bytes\n", rcv_offset);
1757                 }
1758
1759 #endif
1760         }
1761
1762         c->snd.wnd = hdr.wnd; // TODO: move below
1763
1764         // 1c. Drop packets with an invalid ACK.
1765         // ackno should not roll back, and it should also not be bigger than what we ever could have sent
1766         // (= snd.una + c->sndbuf.used).
1767
1768         if(!is_reliable(c)) {
1769                 if(hdr.ack != c->snd.last && c->state >= ESTABLISHED) {
1770                         hdr.ack = c->snd.una;
1771                 }
1772         }
1773
1774         if(hdr.ctl & ACK && (seqdiff(hdr.ack, c->snd.last) > 0 || seqdiff(hdr.ack, c->snd.una) < 0)) {
1775                 debug(c, "packet ack seqno out of range, %u <= %u < %u\n", c->snd.una, hdr.ack, c->snd.una + c->sndbuf.used);
1776
1777                 // Ignore unacceptable RST packets.
1778                 if(hdr.ctl & RST) {
1779                         return 0;
1780                 }
1781
1782                 goto reset;
1783         }
1784
1785         // 2. Handle RST packets
1786
1787         if(hdr.ctl & RST) {
1788                 switch(c->state) {
1789                 case SYN_SENT:
1790                         if(!(hdr.ctl & ACK)) {
1791                                 return 0;
1792                         }
1793
1794                         // The peer has refused our connection.
1795                         set_state(c, CLOSED);
1796                         errno = ECONNREFUSED;
1797
1798                         if(c->recv) {
1799                                 c->recv(c, NULL, 0);
1800                         }
1801
1802                         if(c->poll && !c->reapable) {
1803                                 c->poll(c, 0);
1804                         }
1805
1806                         return 0;
1807
1808                 case SYN_RECEIVED:
1809                         if(hdr.ctl & ACK) {
1810                                 return 0;
1811                         }
1812
1813                         // We haven't told the application about this connection yet. Silently delete.
1814                         free_connection(c);
1815                         return 0;
1816
1817                 case ESTABLISHED:
1818                 case FIN_WAIT_1:
1819                 case FIN_WAIT_2:
1820                 case CLOSE_WAIT:
1821                         if(hdr.ctl & ACK) {
1822                                 return 0;
1823                         }
1824
1825                         // The peer has aborted our connection.
1826                         set_state(c, CLOSED);
1827                         errno = ECONNRESET;
1828
1829                         if(c->recv) {
1830                                 c->recv(c, NULL, 0);
1831                         }
1832
1833                         if(c->poll && !c->reapable) {
1834                                 c->poll(c, 0);
1835                         }
1836
1837                         return 0;
1838
1839                 case CLOSING:
1840                 case LAST_ACK:
1841                 case TIME_WAIT:
1842                         if(hdr.ctl & ACK) {
1843                                 return 0;
1844                         }
1845
1846                         // As far as the application is concerned, the connection has already been closed.
1847                         // If it has called utcp_close() already, we can immediately free this connection.
1848                         if(c->reapable) {
1849                                 free_connection(c);
1850                                 return 0;
1851                         }
1852
1853                         // Otherwise, immediately move to the CLOSED state.
1854                         set_state(c, CLOSED);
1855                         return 0;
1856
1857                 default:
1858 #ifdef UTCP_DEBUG
1859                         abort();
1860 #endif
1861                         break;
1862                 }
1863         }
1864
1865         uint32_t advanced;
1866
1867         if(!(hdr.ctl & ACK)) {
1868                 advanced = 0;
1869                 goto skip_ack;
1870         }
1871
1872         // 3. Advance snd.una
1873
1874         advanced = seqdiff(hdr.ack, c->snd.una);
1875
1876         if(advanced) {
1877                 // RTT measurement
1878                 if(c->rtt_start.tv_sec) {
1879                         if(c->rtt_seq == hdr.ack) {
1880                                 struct timespec now;
1881                                 clock_gettime(UTCP_CLOCK, &now);
1882                                 int32_t diff = timespec_diff_usec(&now, &c->rtt_start);
1883                                 update_rtt(c, diff);
1884                                 c->rtt_start.tv_sec = 0;
1885                         } else if(c->rtt_seq < hdr.ack) {
1886                                 debug(c, "cancelling RTT measurement: %u < %u\n", c->rtt_seq, hdr.ack);
1887                                 c->rtt_start.tv_sec = 0;
1888                         }
1889                 }
1890
1891                 int32_t data_acked = advanced;
1892
1893                 switch(c->state) {
1894                 case SYN_SENT:
1895                 case SYN_RECEIVED:
1896                         data_acked--;
1897                         break;
1898
1899                 // TODO: handle FIN as well.
1900                 default:
1901                         break;
1902                 }
1903
1904                 assert(data_acked >= 0);
1905
1906 #ifndef NDEBUG
1907                 int32_t bufused = seqdiff(c->snd.last, c->snd.una);
1908                 assert(data_acked <= bufused);
1909 #endif
1910
1911                 if(data_acked) {
1912                         buffer_discard(&c->sndbuf, data_acked);
1913
1914                         if(is_reliable(c)) {
1915                                 c->do_poll = true;
1916                         }
1917                 }
1918
1919                 // Also advance snd.nxt if possible
1920                 if(seqdiff(c->snd.nxt, hdr.ack) < 0) {
1921                         c->snd.nxt = hdr.ack;
1922                 }
1923
1924                 c->snd.una = hdr.ack;
1925
1926                 if(c->dupack) {
1927                         if(c->dupack >= 3) {
1928                                 debug(c, "fast recovery ended\n");
1929                                 c->snd.cwnd = c->snd.ssthresh;
1930                         }
1931
1932                         c->dupack = 0;
1933                 }
1934
1935                 // Increase the congestion window according to RFC 5681
1936                 if(c->snd.cwnd < c->snd.ssthresh) {
1937                         c->snd.cwnd += min(advanced, utcp->mss); // eq. 2
1938                 } else {
1939                         c->snd.cwnd += max(1, (utcp->mss * utcp->mss) / c->snd.cwnd); // eq. 3
1940                 }
1941
1942                 if(c->snd.cwnd > c->sndbuf.maxsize) {
1943                         c->snd.cwnd = c->sndbuf.maxsize;
1944                 }
1945
1946                 debug_cwnd(c);
1947
1948                 // Check if we have sent a FIN that is now ACKed.
1949                 switch(c->state) {
1950                 case FIN_WAIT_1:
1951                         if(c->snd.una == c->snd.last) {
1952                                 set_state(c, FIN_WAIT_2);
1953                         }
1954
1955                         break;
1956
1957                 case CLOSING:
1958                         if(c->snd.una == c->snd.last) {
1959                                 clock_gettime(UTCP_CLOCK, &c->conn_timeout);
1960                                 c->conn_timeout.tv_sec += utcp->timeout;
1961                                 set_state(c, TIME_WAIT);
1962                         }
1963
1964                         break;
1965
1966                 default:
1967                         break;
1968                 }
1969         } else {
1970                 if(!len && is_reliable(c) && c->snd.una != c->snd.last) {
1971                         c->dupack++;
1972                         debug(c, "duplicate ACK %d\n", c->dupack);
1973
1974                         if(c->dupack == 3) {
1975                                 // RFC 5681 fast recovery
1976                                 debug(c, "fast recovery started\n", c->dupack);
1977                                 uint32_t flightsize = seqdiff(c->snd.nxt, c->snd.una);
1978                                 c->snd.ssthresh = max(flightsize / 2, utcp->mss * 2); // eq. 4
1979                                 c->snd.cwnd = min(c->snd.ssthresh + 3 * utcp->mss, c->sndbuf.maxsize);
1980
1981                                 if(c->snd.cwnd > c->sndbuf.maxsize) {
1982                                         c->snd.cwnd = c->sndbuf.maxsize;
1983                                 }
1984
1985                                 debug_cwnd(c);
1986
1987                                 fast_retransmit(c);
1988                         } else if(c->dupack > 3) {
1989                                 c->snd.cwnd += utcp->mss;
1990
1991                                 if(c->snd.cwnd > c->sndbuf.maxsize) {
1992                                         c->snd.cwnd = c->sndbuf.maxsize;
1993                                 }
1994
1995                                 debug_cwnd(c);
1996                         }
1997
1998                         // We got an ACK which indicates the other side did get one of our packets.
1999                         // Reset the retransmission timer to avoid going to slow start,
2000                         // but don't touch the connection timeout.
2001                         start_retransmit_timer(c);
2002                 }
2003         }
2004
2005         // 4. Update timers
2006
2007         if(advanced) {
2008                 if(c->snd.una == c->snd.last) {
2009                         stop_retransmit_timer(c);
2010                         timespec_clear(&c->conn_timeout);
2011                 } else if(is_reliable(c)) {
2012                         start_retransmit_timer(c);
2013                         clock_gettime(UTCP_CLOCK, &c->conn_timeout);
2014                         c->conn_timeout.tv_sec += utcp->timeout;
2015                 }
2016         }
2017
2018 skip_ack:
2019         // 5. Process SYN stuff
2020
2021         if(hdr.ctl & SYN) {
2022                 switch(c->state) {
2023                 case SYN_SENT:
2024
2025                         // This is a SYNACK. It should always have ACKed the SYN.
2026                         if(!advanced) {
2027                                 goto reset;
2028                         }
2029
2030                         c->rcv.irs = hdr.seq;
2031                         c->rcv.nxt = hdr.seq + 1;
2032
2033                         if(c->shut_wr) {
2034                                 c->snd.last++;
2035                                 set_state(c, FIN_WAIT_1);
2036                         } else {
2037                                 c->do_poll = true;
2038                                 set_state(c, ESTABLISHED);
2039                         }
2040
2041                         break;
2042
2043                 case SYN_RECEIVED:
2044                         // This is a retransmit of a SYN, send back the SYNACK.
2045                         goto synack;
2046
2047                 case ESTABLISHED:
2048                 case FIN_WAIT_1:
2049                 case FIN_WAIT_2:
2050                 case CLOSE_WAIT:
2051                 case CLOSING:
2052                 case LAST_ACK:
2053                 case TIME_WAIT:
2054                         // This could be a retransmission. Ignore the SYN flag, but send an ACK back.
2055                         break;
2056
2057                 default:
2058 #ifdef UTCP_DEBUG
2059                         abort();
2060 #endif
2061                         return 0;
2062                 }
2063         }
2064
2065         // 6. Process new data
2066
2067         if(c->state == SYN_RECEIVED) {
2068                 // This is the ACK after the SYNACK. It should always have ACKed the SYNACK.
2069                 if(!advanced) {
2070                         goto reset;
2071                 }
2072
2073                 // Are we still LISTENing?
2074                 if(utcp->accept) {
2075                         utcp->accept(c, c->src);
2076                 }
2077
2078                 if(c->state != ESTABLISHED) {
2079                         set_state(c, CLOSED);
2080                         c->reapable = true;
2081                         goto reset;
2082                 }
2083         }
2084
2085         if(len) {
2086                 switch(c->state) {
2087                 case SYN_SENT:
2088                 case SYN_RECEIVED:
2089                         // This should never happen.
2090 #ifdef UTCP_DEBUG
2091                         abort();
2092 #endif
2093                         return 0;
2094
2095                 case ESTABLISHED:
2096                 case FIN_WAIT_1:
2097                 case FIN_WAIT_2:
2098                         break;
2099
2100                 case CLOSE_WAIT:
2101                 case CLOSING:
2102                 case LAST_ACK:
2103                 case TIME_WAIT:
2104                         // Ehm no, We should never receive more data after a FIN.
2105                         goto reset;
2106
2107                 default:
2108 #ifdef UTCP_DEBUG
2109                         abort();
2110 #endif
2111                         return 0;
2112                 }
2113
2114                 handle_incoming_data(c, &hdr, ptr, len);
2115         }
2116
2117         // 7. Process FIN stuff
2118
2119         if((hdr.ctl & FIN) && (!is_reliable(c) || hdr.seq + len == c->rcv.nxt)) {
2120                 switch(c->state) {
2121                 case SYN_SENT:
2122                 case SYN_RECEIVED:
2123                         // This should never happen.
2124 #ifdef UTCP_DEBUG
2125                         abort();
2126 #endif
2127                         break;
2128
2129                 case ESTABLISHED:
2130                         set_state(c, CLOSE_WAIT);
2131                         break;
2132
2133                 case FIN_WAIT_1:
2134                         set_state(c, CLOSING);
2135                         break;
2136
2137                 case FIN_WAIT_2:
2138                         clock_gettime(UTCP_CLOCK, &c->conn_timeout);
2139                         c->conn_timeout.tv_sec += utcp->timeout;
2140                         set_state(c, TIME_WAIT);
2141                         break;
2142
2143                 case CLOSE_WAIT:
2144                 case CLOSING:
2145                 case LAST_ACK:
2146                 case TIME_WAIT:
2147                         // Ehm, no. We should never receive a second FIN.
2148                         goto reset;
2149
2150                 default:
2151 #ifdef UTCP_DEBUG
2152                         abort();
2153 #endif
2154                         break;
2155                 }
2156
2157                 // FIN counts as one sequence number
2158                 c->rcv.nxt++;
2159                 len++;
2160
2161                 // Inform the application that the peer closed its end of the connection.
2162                 if(c->recv) {
2163                         errno = 0;
2164                         c->recv(c, NULL, 0);
2165                 }
2166         }
2167
2168         // Now we send something back if:
2169         // - we received data, so we have to send back an ACK
2170         //   -> sendatleastone = true
2171         // - or we got an ack, so we should maybe send a bit more data
2172         //   -> sendatleastone = false
2173
2174         if(is_reliable(c) || hdr.ctl & SYN || hdr.ctl & FIN) {
2175                 ack(c, has_data);
2176         }
2177
2178         return 0;
2179
2180 reset:
2181         swap_ports(&hdr);
2182         hdr.wnd = 0;
2183         hdr.aux = 0;
2184
2185         if(hdr.ctl & ACK) {
2186                 hdr.seq = hdr.ack;
2187                 hdr.ctl = RST;
2188         } else {
2189                 hdr.ack = hdr.seq + len;
2190                 hdr.seq = 0;
2191                 hdr.ctl = RST | ACK;
2192         }
2193
2194         print_packet(c, "send", &hdr, sizeof(hdr));
2195         utcp->send(utcp, &hdr, sizeof(hdr));
2196         return 0;
2197
2198 }
2199
2200 int utcp_shutdown(struct utcp_connection *c, int dir) {
2201         debug(c, "shutdown %d at %u\n", dir, c ? c->snd.last : 0);
2202
2203         if(!c) {
2204                 errno = EFAULT;
2205                 return -1;
2206         }
2207
2208         if(c->reapable) {
2209                 debug(c, "shutdown() called on closed connection\n");
2210                 errno = EBADF;
2211                 return -1;
2212         }
2213
2214         if(!(dir == UTCP_SHUT_RD || dir == UTCP_SHUT_WR || dir == UTCP_SHUT_RDWR)) {
2215                 errno = EINVAL;
2216                 return -1;
2217         }
2218
2219         // TCP does not have a provision for stopping incoming packets.
2220         // The best we can do is to just ignore them.
2221         if(dir == UTCP_SHUT_RD || dir == UTCP_SHUT_RDWR) {
2222                 c->recv = NULL;
2223         }
2224
2225         // The rest of the code deals with shutting down writes.
2226         if(dir == UTCP_SHUT_RD) {
2227                 return 0;
2228         }
2229
2230         // Only process shutting down writes once.
2231         if(c->shut_wr) {
2232                 return 0;
2233         }
2234
2235         c->shut_wr = true;
2236
2237         switch(c->state) {
2238         case CLOSED:
2239         case LISTEN:
2240                 errno = ENOTCONN;
2241                 return -1;
2242
2243         case SYN_SENT:
2244                 return 0;
2245
2246         case SYN_RECEIVED:
2247         case ESTABLISHED:
2248                 if(!is_reliable(c) && is_framed(c)) {
2249                         flush_unreliable_framed(c);
2250                 }
2251
2252                 set_state(c, FIN_WAIT_1);
2253                 break;
2254
2255         case FIN_WAIT_1:
2256         case FIN_WAIT_2:
2257                 return 0;
2258
2259         case CLOSE_WAIT:
2260                 set_state(c, CLOSING);
2261                 break;
2262
2263         case CLOSING:
2264         case LAST_ACK:
2265         case TIME_WAIT:
2266                 return 0;
2267         }
2268
2269         c->snd.last++;
2270
2271         ack(c, !is_reliable(c));
2272
2273         if(!timespec_isset(&c->rtrx_timeout)) {
2274                 start_retransmit_timer(c);
2275         }
2276
2277         return 0;
2278 }
2279
2280 static bool reset_connection(struct utcp_connection *c) {
2281         if(!c) {
2282                 errno = EFAULT;
2283                 return false;
2284         }
2285
2286         if(c->reapable) {
2287                 debug(c, "abort() called on closed connection\n");
2288                 errno = EBADF;
2289                 return false;
2290         }
2291
2292         c->recv = NULL;
2293         c->poll = NULL;
2294
2295         switch(c->state) {
2296         case CLOSED:
2297                 return true;
2298
2299         case LISTEN:
2300         case SYN_SENT:
2301         case CLOSING:
2302         case LAST_ACK:
2303         case TIME_WAIT:
2304                 set_state(c, CLOSED);
2305                 return true;
2306
2307         case SYN_RECEIVED:
2308         case ESTABLISHED:
2309         case FIN_WAIT_1:
2310         case FIN_WAIT_2:
2311         case CLOSE_WAIT:
2312                 set_state(c, CLOSED);
2313                 break;
2314         }
2315
2316         // Send RST
2317
2318         struct hdr hdr;
2319
2320         hdr.src = c->src;
2321         hdr.dst = c->dst;
2322         hdr.seq = c->snd.nxt;
2323         hdr.ack = 0;
2324         hdr.wnd = 0;
2325         hdr.ctl = RST;
2326
2327         print_packet(c, "send", &hdr, sizeof(hdr));
2328         c->utcp->send(c->utcp, &hdr, sizeof(hdr));
2329         return true;
2330 }
2331
2332 // Closes all the opened connections
2333 void utcp_abort_all_connections(struct utcp *utcp) {
2334         if(!utcp) {
2335                 errno = EINVAL;
2336                 return;
2337         }
2338
2339         for(int i = 0; i < utcp->nconnections; i++) {
2340                 struct utcp_connection *c = utcp->connections[i];
2341
2342                 if(c->reapable || c->state == CLOSED) {
2343                         continue;
2344                 }
2345
2346                 utcp_recv_t old_recv = c->recv;
2347                 utcp_poll_t old_poll = c->poll;
2348
2349                 reset_connection(c);
2350
2351                 if(old_recv) {
2352                         errno = 0;
2353                         old_recv(c, NULL, 0);
2354                 }
2355
2356                 if(old_poll && !c->reapable) {
2357                         errno = 0;
2358                         old_poll(c, 0);
2359                 }
2360         }
2361
2362         return;
2363 }
2364
2365 int utcp_close(struct utcp_connection *c) {
2366         debug(c, "closing\n");
2367
2368         if(c->rcvbuf.used) {
2369                 debug(c, "receive buffer not empty, resetting\n");
2370                 return reset_connection(c) ? 0 : -1;
2371         }
2372
2373         if(utcp_shutdown(c, SHUT_RDWR) && errno != ENOTCONN) {
2374                 return -1;
2375         }
2376
2377         c->recv = NULL;
2378         c->poll = NULL;
2379         c->reapable = true;
2380         return 0;
2381 }
2382
2383 int utcp_abort(struct utcp_connection *c) {
2384         if(!reset_connection(c)) {
2385                 return -1;
2386         }
2387
2388         c->reapable = true;
2389         return 0;
2390 }
2391
2392 /* Handle timeouts.
2393  * One call to this function will loop through all connections,
2394  * checking if something needs to be resent or not.
2395  * The return value is the time to the next timeout in milliseconds,
2396  * or maybe a negative value if the timeout is infinite.
2397  */
2398 struct timespec utcp_timeout(struct utcp *utcp) {
2399         struct timespec now;
2400         clock_gettime(UTCP_CLOCK, &now);
2401         struct timespec next = {now.tv_sec + 3600, now.tv_nsec};
2402
2403         for(int i = 0; i < utcp->nconnections; i++) {
2404                 struct utcp_connection *c = utcp->connections[i];
2405
2406                 if(!c) {
2407                         continue;
2408                 }
2409
2410                 // delete connections that have been utcp_close()d.
2411                 if(c->state == CLOSED) {
2412                         if(c->reapable) {
2413                                 debug(c, "reaping\n");
2414                                 free_connection(c);
2415                                 i--;
2416                         }
2417
2418                         continue;
2419                 }
2420
2421                 if(timespec_isset(&c->conn_timeout) && timespec_lt(&c->conn_timeout, &now)) {
2422                         errno = ETIMEDOUT;
2423                         c->state = CLOSED;
2424
2425                         if(c->recv) {
2426                                 c->recv(c, NULL, 0);
2427                         }
2428
2429                         if(c->poll && !c->reapable) {
2430                                 c->poll(c, 0);
2431                         }
2432
2433                         continue;
2434                 }
2435
2436                 if(timespec_isset(&c->rtrx_timeout) && timespec_lt(&c->rtrx_timeout, &now)) {
2437                         debug(c, "retransmitting after timeout\n");
2438                         retransmit(c);
2439                 }
2440
2441                 if(c->poll) {
2442                         if((c->state == ESTABLISHED || c->state == CLOSE_WAIT) && c->do_poll) {
2443                                 c->do_poll = false;
2444                                 uint32_t len = is_framed(c) ? min(buffer_free(&c->sndbuf), MAX_UNRELIABLE_SIZE) : buffer_free(&c->sndbuf);
2445
2446                                 if(len) {
2447                                         c->poll(c, len);
2448                                 }
2449                         } else if(c->state == CLOSED) {
2450                                 c->poll(c, 0);
2451                         }
2452                 }
2453
2454                 if(timespec_isset(&c->conn_timeout) && timespec_lt(&c->conn_timeout, &next)) {
2455                         next = c->conn_timeout;
2456                 }
2457
2458                 if(timespec_isset(&c->rtrx_timeout) && timespec_lt(&c->rtrx_timeout, &next)) {
2459                         next = c->rtrx_timeout;
2460                 }
2461         }
2462
2463         struct timespec diff;
2464
2465         timespec_sub(&next, &now, &diff);
2466
2467         return diff;
2468 }
2469
2470 bool utcp_is_active(struct utcp *utcp) {
2471         if(!utcp) {
2472                 return false;
2473         }
2474
2475         for(int i = 0; i < utcp->nconnections; i++)
2476                 if(utcp->connections[i]->state != CLOSED && utcp->connections[i]->state != TIME_WAIT) {
2477                         return true;
2478                 }
2479
2480         return false;
2481 }
2482
2483 struct utcp *utcp_init(utcp_accept_t accept, utcp_pre_accept_t pre_accept, utcp_send_t send, void *priv) {
2484         if(!send) {
2485                 errno = EFAULT;
2486                 return NULL;
2487         }
2488
2489         struct utcp *utcp = calloc(1, sizeof(*utcp));
2490
2491         if(!utcp) {
2492                 return NULL;
2493         }
2494
2495         utcp_set_mtu(utcp, DEFAULT_MTU);
2496
2497         if(!utcp->pkt) {
2498                 free(utcp);
2499                 return NULL;
2500         }
2501
2502         if(!CLOCK_GRANULARITY) {
2503                 struct timespec res;
2504                 clock_getres(UTCP_CLOCK, &res);
2505                 CLOCK_GRANULARITY = res.tv_sec * USEC_PER_SEC + res.tv_nsec / 1000;
2506         }
2507
2508         utcp->accept = accept;
2509         utcp->pre_accept = pre_accept;
2510         utcp->send = send;
2511         utcp->priv = priv;
2512         utcp->timeout = DEFAULT_USER_TIMEOUT; // sec
2513
2514         return utcp;
2515 }
2516
2517 void utcp_exit(struct utcp *utcp) {
2518         if(!utcp) {
2519                 return;
2520         }
2521
2522         for(int i = 0; i < utcp->nconnections; i++) {
2523                 struct utcp_connection *c = utcp->connections[i];
2524
2525                 if(!c->reapable) {
2526                         if(c->recv) {
2527                                 c->recv(c, NULL, 0);
2528                         }
2529
2530                         if(c->poll && !c->reapable) {
2531                                 c->poll(c, 0);
2532                         }
2533                 }
2534
2535                 buffer_exit(&c->rcvbuf);
2536                 buffer_exit(&c->sndbuf);
2537                 free(c);
2538         }
2539
2540         free(utcp->connections);
2541         free(utcp->pkt);
2542         free(utcp);
2543 }
2544
2545 uint16_t utcp_get_mtu(struct utcp *utcp) {
2546         return utcp ? utcp->mtu : 0;
2547 }
2548
2549 uint16_t utcp_get_mss(struct utcp *utcp) {
2550         return utcp ? utcp->mss : 0;
2551 }
2552
2553 void utcp_set_mtu(struct utcp *utcp, uint16_t mtu) {
2554         if(!utcp) {
2555                 return;
2556         }
2557
2558         if(mtu <= sizeof(struct hdr)) {
2559                 return;
2560         }
2561
2562         if(mtu > utcp->mtu) {
2563                 char *new = realloc(utcp->pkt, mtu + sizeof(struct hdr));
2564
2565                 if(!new) {
2566                         return;
2567                 }
2568
2569                 utcp->pkt = new;
2570         }
2571
2572         utcp->mtu = mtu;
2573         utcp->mss = mtu - sizeof(struct hdr);
2574 }
2575
2576 void utcp_reset_timers(struct utcp *utcp) {
2577         if(!utcp) {
2578                 return;
2579         }
2580
2581         struct timespec now, then;
2582
2583         clock_gettime(UTCP_CLOCK, &now);
2584
2585         then = now;
2586
2587         then.tv_sec += utcp->timeout;
2588
2589         for(int i = 0; i < utcp->nconnections; i++) {
2590                 struct utcp_connection *c = utcp->connections[i];
2591
2592                 if(c->reapable) {
2593                         continue;
2594                 }
2595
2596                 if(timespec_isset(&c->rtrx_timeout)) {
2597                         c->rtrx_timeout = now;
2598                 }
2599
2600                 if(timespec_isset(&c->conn_timeout)) {
2601                         c->conn_timeout = then;
2602                 }
2603
2604                 c->rtt_start.tv_sec = 0;
2605
2606                 if(c->rto > START_RTO) {
2607                         c->rto = START_RTO;
2608                 }
2609         }
2610 }
2611
2612 int utcp_get_user_timeout(struct utcp *u) {
2613         return u ? u->timeout : 0;
2614 }
2615
2616 void utcp_set_user_timeout(struct utcp *u, int timeout) {
2617         if(u) {
2618                 u->timeout = timeout;
2619         }
2620 }
2621
2622 size_t utcp_get_sndbuf(struct utcp_connection *c) {
2623         return c ? c->sndbuf.maxsize : 0;
2624 }
2625
2626 size_t utcp_get_sndbuf_free(struct utcp_connection *c) {
2627         if(!c) {
2628                 return 0;
2629         }
2630
2631         switch(c->state) {
2632         case SYN_SENT:
2633         case SYN_RECEIVED:
2634         case ESTABLISHED:
2635         case CLOSE_WAIT:
2636                 return buffer_free(&c->sndbuf);
2637
2638         default:
2639                 return 0;
2640         }
2641 }
2642
2643 void utcp_set_sndbuf(struct utcp_connection *c, size_t size) {
2644         if(!c) {
2645                 return;
2646         }
2647
2648         c->sndbuf.maxsize = size;
2649
2650         if(c->sndbuf.maxsize != size) {
2651                 c->sndbuf.maxsize = -1;
2652         }
2653
2654         c->do_poll = is_reliable(c) && buffer_free(&c->sndbuf);
2655 }
2656
2657 size_t utcp_get_rcvbuf(struct utcp_connection *c) {
2658         return c ? c->rcvbuf.maxsize : 0;
2659 }
2660
2661 size_t utcp_get_rcvbuf_free(struct utcp_connection *c) {
2662         if(c && (c->state == ESTABLISHED || c->state == CLOSE_WAIT)) {
2663                 return buffer_free(&c->rcvbuf);
2664         } else {
2665                 return 0;
2666         }
2667 }
2668
2669 void utcp_set_rcvbuf(struct utcp_connection *c, size_t size) {
2670         if(!c) {
2671                 return;
2672         }
2673
2674         c->rcvbuf.maxsize = size;
2675
2676         if(c->rcvbuf.maxsize != size) {
2677                 c->rcvbuf.maxsize = -1;
2678         }
2679 }
2680
2681 size_t utcp_get_sendq(struct utcp_connection *c) {
2682         return c->sndbuf.used;
2683 }
2684
2685 size_t utcp_get_recvq(struct utcp_connection *c) {
2686         return c->rcvbuf.used;
2687 }
2688
2689 bool utcp_get_nodelay(struct utcp_connection *c) {
2690         return c ? c->nodelay : false;
2691 }
2692
2693 void utcp_set_nodelay(struct utcp_connection *c, bool nodelay) {
2694         if(c) {
2695                 c->nodelay = nodelay;
2696         }
2697 }
2698
2699 bool utcp_get_keepalive(struct utcp_connection *c) {
2700         return c ? c->keepalive : false;
2701 }
2702
2703 void utcp_set_keepalive(struct utcp_connection *c, bool keepalive) {
2704         if(c) {
2705                 c->keepalive = keepalive;
2706         }
2707 }
2708
2709 size_t utcp_get_outq(struct utcp_connection *c) {
2710         return c ? seqdiff(c->snd.nxt, c->snd.una) : 0;
2711 }
2712
2713 void utcp_set_recv_cb(struct utcp_connection *c, utcp_recv_t recv) {
2714         if(c) {
2715                 c->recv = recv;
2716         }
2717 }
2718
2719 void utcp_set_poll_cb(struct utcp_connection *c, utcp_poll_t poll) {
2720         if(c) {
2721                 c->poll = poll;
2722                 c->do_poll = is_reliable(c) && buffer_free(&c->sndbuf);
2723         }
2724 }
2725
2726 void utcp_set_accept_cb(struct utcp *utcp, utcp_accept_t accept, utcp_pre_accept_t pre_accept) {
2727         if(utcp) {
2728                 utcp->accept = accept;
2729                 utcp->pre_accept = pre_accept;
2730         }
2731 }
2732
2733 void utcp_expect_data(struct utcp_connection *c, bool expect) {
2734         if(!c || c->reapable) {
2735                 return;
2736         }
2737
2738         if(!(c->state == ESTABLISHED || c->state == FIN_WAIT_1 || c->state == FIN_WAIT_2)) {
2739                 return;
2740         }
2741
2742         if(expect) {
2743                 // If we expect data, start the connection timer.
2744                 if(!timespec_isset(&c->conn_timeout)) {
2745                         clock_gettime(UTCP_CLOCK, &c->conn_timeout);
2746                         c->conn_timeout.tv_sec += c->utcp->timeout;
2747                 }
2748         } else {
2749                 // If we want to cancel expecting data, only clear the timer when there is no unACKed data.
2750                 if(c->snd.una == c->snd.last) {
2751                         timespec_clear(&c->conn_timeout);
2752                 }
2753         }
2754 }
2755
2756 void utcp_offline(struct utcp *utcp, bool offline) {
2757         struct timespec now;
2758         clock_gettime(UTCP_CLOCK, &now);
2759
2760         for(int i = 0; i < utcp->nconnections; i++) {
2761                 struct utcp_connection *c = utcp->connections[i];
2762
2763                 if(c->reapable) {
2764                         continue;
2765                 }
2766
2767                 utcp_expect_data(c, offline);
2768
2769                 if(!offline) {
2770                         if(timespec_isset(&c->rtrx_timeout)) {
2771                                 c->rtrx_timeout = now;
2772                         }
2773
2774                         utcp->connections[i]->rtt_start.tv_sec = 0;
2775
2776                         if(c->rto > START_RTO) {
2777                                 c->rto = START_RTO;
2778                         }
2779                 }
2780         }
2781 }
2782
2783 void utcp_set_retransmit_cb(struct utcp *utcp, utcp_retransmit_t cb) {
2784         utcp->retransmit = cb;
2785 }
2786
2787 void utcp_set_clock_granularity(long granularity) {
2788         CLOCK_GRANULARITY = granularity;
2789 }
2790
2791 int utcp_get_flush_timeout(struct utcp *utcp) {
2792         return utcp->flush_timeout;
2793 }
2794
2795 void utcp_set_flush_timeout(struct utcp *utcp, int milliseconds) {
2796         utcp->flush_timeout = milliseconds;
2797 }
2798
2799 bool utcp_get_flush_needed(struct utcp_connection *c) {
2800         bool value = c->flush_needed;
2801         c->flush_needed = false;
2802         return value;
2803 }