]> git.meshlink.io Git - meshlink/blob - src/utcp.c
Ensure the flush timer is started if we never had any full packets to send.
[meshlink] / src / utcp.c
1 /*
2     utcp.c -- Userspace TCP
3     Copyright (C) 2014-2017 Guus Sliepen <guus@tinc-vpn.org>
4
5     This program is free software; you can redistribute it and/or modify
6     it under the terms of the GNU General Public License as published by
7     the Free Software Foundation; either version 2 of the License, or
8     (at your option) any later version.
9
10     This program is distributed in the hope that it will be useful,
11     but WITHOUT ANY WARRANTY; without even the implied warranty of
12     MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
13     GNU General Public License for more details.
14
15     You should have received a copy of the GNU General Public License along
16     with this program; if not, write to the Free Software Foundation, Inc.,
17     51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
18 */
19
20 #define _GNU_SOURCE
21
22 #include <assert.h>
23 #include <errno.h>
24 #include <stdio.h>
25 #include <stdlib.h>
26 #include <stdint.h>
27 #include <stdbool.h>
28 #include <string.h>
29 #include <unistd.h>
30 #include <time.h>
31
32 #include "utcp_priv.h"
33
34 #ifndef EBADMSG
35 #define EBADMSG         104
36 #endif
37
38 #ifndef SHUT_RDWR
39 #define SHUT_RDWR 2
40 #endif
41
42 #ifdef poll
43 #undef poll
44 #endif
45
46 #ifndef UTCP_CLOCK
47 #if defined(CLOCK_MONOTONIC_RAW) && defined(__x86_64__)
48 #define UTCP_CLOCK CLOCK_MONOTONIC_RAW
49 #else
50 #define UTCP_CLOCK CLOCK_MONOTONIC
51 #endif
52 #endif
53
54 static void timespec_sub(const struct timespec *a, const struct timespec *b, struct timespec *r) {
55         r->tv_sec = a->tv_sec - b->tv_sec;
56         r->tv_nsec = a->tv_nsec - b->tv_nsec;
57
58         if(r->tv_nsec < 0) {
59                 r->tv_sec--, r->tv_nsec += NSEC_PER_SEC;
60         }
61 }
62
63 static int32_t timespec_diff_usec(const struct timespec *a, const struct timespec *b) {
64         return (a->tv_sec - b->tv_sec) * 1000000 + (a->tv_nsec - b->tv_nsec) / 1000;
65 }
66
67 static bool timespec_lt(const struct timespec *a, const struct timespec *b) {
68         if(a->tv_sec == b->tv_sec) {
69                 return a->tv_nsec < b->tv_nsec;
70         } else {
71                 return a->tv_sec < b->tv_sec;
72         }
73 }
74
75 static void timespec_clear(struct timespec *a) {
76         a->tv_sec = 0;
77         a->tv_nsec = 0;
78 }
79
80 static bool timespec_isset(const struct timespec *a) {
81         return a->tv_sec;
82 }
83
84 static long CLOCK_GRANULARITY; // usec
85
86 static inline size_t min(size_t a, size_t b) {
87         return a < b ? a : b;
88 }
89
90 static inline size_t max(size_t a, size_t b) {
91         return a > b ? a : b;
92 }
93
94 #ifdef UTCP_DEBUG
95 #include <stdarg.h>
96
97 #ifndef UTCP_DEBUG_DATALEN
98 #define UTCP_DEBUG_DATALEN 20
99 #endif
100
101 static void debug(struct utcp_connection *c, const char *format, ...) {
102         struct timespec tv;
103         char buf[1024];
104         int len;
105
106         clock_gettime(CLOCK_REALTIME, &tv);
107         len = snprintf(buf, sizeof(buf), "%ld.%06lu %u:%u ", (long)tv.tv_sec, tv.tv_nsec / 1000, c ? c->src : 0, c ? c->dst : 0);
108         va_list ap;
109         va_start(ap, format);
110         len += vsnprintf(buf + len, sizeof(buf) - len, format, ap);
111         va_end(ap);
112
113         if(len > 0 && (size_t)len < sizeof(buf)) {
114                 fwrite(buf, len, 1, stderr);
115         }
116 }
117
118 static void print_packet(struct utcp_connection *c, const char *dir, const void *pkt, size_t len) {
119         struct hdr hdr;
120
121         if(len < sizeof(hdr)) {
122                 debug(c, "%s: short packet (%lu bytes)\n", dir, (unsigned long)len);
123                 return;
124         }
125
126         memcpy(&hdr, pkt, sizeof(hdr));
127
128         uint32_t datalen;
129
130         if(len > sizeof(hdr)) {
131                 datalen = min(len - sizeof(hdr), UTCP_DEBUG_DATALEN);
132         } else {
133                 datalen = 0;
134         }
135
136
137         const uint8_t *data = (uint8_t *)pkt + sizeof(hdr);
138         char str[datalen * 2 + 1];
139         char *p = str;
140
141         for(uint32_t i = 0; i < datalen; i++) {
142                 *p++ = "0123456789ABCDEF"[data[i] >> 4];
143                 *p++ = "0123456789ABCDEF"[data[i] & 15];
144         }
145
146         *p = 0;
147
148         debug(c, "%s: len %lu src %u dst %u seq %u ack %u wnd %u aux %x ctl %s%s%s%s%s data %s\n",
149               dir, (unsigned long)len, hdr.src, hdr.dst, hdr.seq, hdr.ack, hdr.wnd, hdr.aux,
150               hdr.ctl & SYN ? "SYN" : "",
151               hdr.ctl & RST ? "RST" : "",
152               hdr.ctl & FIN ? "FIN" : "",
153               hdr.ctl & ACK ? "ACK" : "",
154               hdr.ctl & MF ? "MF" : "",
155               str
156              );
157 }
158
159 static void debug_cwnd(struct utcp_connection *c) {
160         debug(c, "snd.cwnd %u snd.ssthresh %u\n", c->snd.cwnd, ~c->snd.ssthresh ? c->snd.ssthresh : 0);
161 }
162 #else
163 #define debug(...) do {} while(0)
164 #define print_packet(...) do {} while(0)
165 #define debug_cwnd(...) do {} while(0)
166 #endif
167
168 static void set_state(struct utcp_connection *c, enum state state) {
169         c->state = state;
170
171         if(state == ESTABLISHED) {
172                 timespec_clear(&c->conn_timeout);
173         }
174
175         debug(c, "state %s\n", strstate[state]);
176 }
177
178 static bool fin_wanted(struct utcp_connection *c, uint32_t seq) {
179         if(seq != c->snd.last) {
180                 return false;
181         }
182
183         switch(c->state) {
184         case FIN_WAIT_1:
185         case CLOSING:
186         case LAST_ACK:
187                 return true;
188
189         default:
190                 return false;
191         }
192 }
193
194 static bool is_reliable(struct utcp_connection *c) {
195         return c->flags & UTCP_RELIABLE;
196 }
197
198 static bool is_framed(struct utcp_connection *c) {
199         return c->flags & UTCP_FRAMED;
200 }
201
202 static int32_t seqdiff(uint32_t a, uint32_t b) {
203         return a - b;
204 }
205
206 // Buffer functions
207 static bool buffer_wraps(struct buffer *buf) {
208         return buf->size - buf->offset < buf->used;
209 }
210
211 static bool buffer_resize(struct buffer *buf, uint32_t newsize) {
212         char *newdata = realloc(buf->data, newsize);
213
214         if(!newdata) {
215                 return false;
216         }
217
218         buf->data = newdata;
219
220         if(buffer_wraps(buf)) {
221                 // Shift the right part of the buffer until it hits the end of the new buffer.
222                 // Old situation:
223                 // [345......012]
224                 // New situation:
225                 // [345.........|........012]
226                 uint32_t tailsize = buf->size - buf->offset;
227                 uint32_t newoffset = newsize - tailsize;
228                 memmove(buf->data + newoffset, buf->data + buf->offset, tailsize);
229                 buf->offset = newoffset;
230         }
231
232         buf->size = newsize;
233         return true;
234 }
235
236 // Store data into the buffer
237 static ssize_t buffer_put_at(struct buffer *buf, size_t offset, const void *data, size_t len) {
238         debug(NULL, "buffer_put_at %lu %lu %lu\n", (unsigned long)buf->used, (unsigned long)offset, (unsigned long)len);
239
240         // Ensure we don't store more than maxsize bytes in total
241         size_t required = offset + len;
242
243         if(required > buf->maxsize) {
244                 if(offset >= buf->maxsize) {
245                         return 0;
246                 }
247
248                 len = buf->maxsize - offset;
249                 required = buf->maxsize;
250         }
251
252         // Check if we need to resize the buffer
253         if(required > buf->size) {
254                 size_t newsize = buf->size;
255
256                 if(!newsize) {
257                         newsize = 4096;
258                 }
259
260                 do {
261                         newsize *= 2;
262                 } while(newsize < required);
263
264                 if(newsize > buf->maxsize) {
265                         newsize = buf->maxsize;
266                 }
267
268                 if(!buffer_resize(buf, newsize)) {
269                         return -1;
270                 }
271         }
272
273         uint32_t realoffset = buf->offset + offset;
274
275         if(buf->size - buf->offset <= offset) {
276                 // The offset wrapped
277                 realoffset -= buf->size;
278         }
279
280         if(buf->size - realoffset < len) {
281                 // The new chunk of data must be wrapped
282                 memcpy(buf->data + realoffset, data, buf->size - realoffset);
283                 memcpy(buf->data, (char *)data + buf->size - realoffset, len - (buf->size - realoffset));
284         } else {
285                 memcpy(buf->data + realoffset, data, len);
286         }
287
288         if(required > buf->used) {
289                 buf->used = required;
290         }
291
292         return len;
293 }
294
295 static ssize_t buffer_put(struct buffer *buf, const void *data, size_t len) {
296         return buffer_put_at(buf, buf->used, data, len);
297 }
298
299 // Copy data from the buffer without removing it.
300 static ssize_t buffer_copy(struct buffer *buf, void *data, size_t offset, size_t len) {
301         // Ensure we don't copy more than is actually stored in the buffer
302         if(offset >= buf->used) {
303                 return 0;
304         }
305
306         if(buf->used - offset < len) {
307                 len = buf->used - offset;
308         }
309
310         uint32_t realoffset = buf->offset + offset;
311
312         if(buf->size - buf->offset <= offset) {
313                 // The offset wrapped
314                 realoffset -= buf->size;
315         }
316
317         if(buf->size - realoffset < len) {
318                 // The data is wrapped
319                 memcpy(data, buf->data + realoffset, buf->size - realoffset);
320                 memcpy((char *)data + buf->size - realoffset, buf->data, len - (buf->size - realoffset));
321         } else {
322                 memcpy(data, buf->data + realoffset, len);
323         }
324
325         return len;
326 }
327
328 // Copy data from the buffer without removing it.
329 static ssize_t buffer_call(struct utcp_connection *c, struct buffer *buf, size_t offset, size_t len) {
330         if(!c->recv) {
331                 return len;
332         }
333
334         // Ensure we don't copy more than is actually stored in the buffer
335         if(offset >= buf->used) {
336                 return 0;
337         }
338
339         if(buf->used - offset < len) {
340                 len = buf->used - offset;
341         }
342
343         uint32_t realoffset = buf->offset + offset;
344
345         if(buf->size - buf->offset <= offset) {
346                 // The offset wrapped
347                 realoffset -= buf->size;
348         }
349
350         if(buf->size - realoffset < len) {
351                 // The data is wrapped
352                 ssize_t rx1 = c->recv(c, buf->data + realoffset, buf->size - realoffset);
353
354                 if(rx1 < buf->size - realoffset) {
355                         return rx1;
356                 }
357
358                 // The channel might have been closed by the previous callback
359                 if(!c->recv) {
360                         return len;
361                 }
362
363                 ssize_t rx2 = c->recv(c, buf->data, len - (buf->size - realoffset));
364
365                 if(rx2 < 0) {
366                         return rx2;
367                 } else {
368                         return rx1 + rx2;
369                 }
370         } else {
371                 return c->recv(c, buf->data + realoffset, len);
372         }
373 }
374
375 // Discard data from the buffer.
376 static ssize_t buffer_discard(struct buffer *buf, size_t len) {
377         if(buf->used < len) {
378                 len = buf->used;
379         }
380
381         if(buf->size - buf->offset <= len) {
382                 buf->offset -= buf->size;
383         }
384
385         if(buf->used == len) {
386                 buf->offset = 0;
387         } else {
388                 buf->offset += len;
389         }
390
391         buf->used -= len;
392
393         return len;
394 }
395
396 static void buffer_clear(struct buffer *buf) {
397         buf->used = 0;
398         buf->offset = 0;
399 }
400
401 static bool buffer_set_size(struct buffer *buf, uint32_t minsize, uint32_t maxsize) {
402         if(maxsize < minsize) {
403                 maxsize = minsize;
404         }
405
406         buf->maxsize = maxsize;
407
408         return buf->size >= minsize || buffer_resize(buf, minsize);
409 }
410
411 static void buffer_exit(struct buffer *buf) {
412         free(buf->data);
413         memset(buf, 0, sizeof(*buf));
414 }
415
416 static uint32_t buffer_free(const struct buffer *buf) {
417         return buf->maxsize > buf->used ? buf->maxsize - buf->used : 0;
418 }
419
420 // Connections are stored in a sorted list.
421 // This gives O(log(N)) lookup time, O(N log(N)) insertion time and O(N) deletion time.
422
423 static int compare(const void *va, const void *vb) {
424         assert(va && vb);
425
426         const struct utcp_connection *a = *(struct utcp_connection **)va;
427         const struct utcp_connection *b = *(struct utcp_connection **)vb;
428
429         assert(a && b);
430         assert(a->src && b->src);
431
432         int c = (int)a->src - (int)b->src;
433
434         if(c) {
435                 return c;
436         }
437
438         c = (int)a->dst - (int)b->dst;
439         return c;
440 }
441
442 static struct utcp_connection *find_connection(const struct utcp *utcp, uint16_t src, uint16_t dst) {
443         if(!utcp->nconnections) {
444                 return NULL;
445         }
446
447         struct utcp_connection key = {
448                 .src = src,
449                 .dst = dst,
450         }, *keyp = &key;
451         struct utcp_connection **match = bsearch(&keyp, utcp->connections, utcp->nconnections, sizeof(*utcp->connections), compare);
452         return match ? *match : NULL;
453 }
454
455 static void free_connection(struct utcp_connection *c) {
456         struct utcp *utcp = c->utcp;
457         struct utcp_connection **cp = bsearch(&c, utcp->connections, utcp->nconnections, sizeof(*utcp->connections), compare);
458
459         assert(cp);
460
461         int i = cp - utcp->connections;
462         memmove(cp, cp + 1, (utcp->nconnections - i - 1) * sizeof(*cp));
463         utcp->nconnections--;
464
465         buffer_exit(&c->rcvbuf);
466         buffer_exit(&c->sndbuf);
467         free(c);
468 }
469
470 static struct utcp_connection *allocate_connection(struct utcp *utcp, uint16_t src, uint16_t dst) {
471         // Check whether this combination of src and dst is free
472
473         if(src) {
474                 if(find_connection(utcp, src, dst)) {
475                         errno = EADDRINUSE;
476                         return NULL;
477                 }
478         } else { // If src == 0, generate a random port number with the high bit set
479                 if(utcp->nconnections >= 32767) {
480                         errno = ENOMEM;
481                         return NULL;
482                 }
483
484                 src = rand() | 0x8000;
485
486                 while(find_connection(utcp, src, dst)) {
487                         src++;
488                 }
489         }
490
491         // Allocate memory for the new connection
492
493         if(utcp->nconnections >= utcp->nallocated) {
494                 if(!utcp->nallocated) {
495                         utcp->nallocated = 4;
496                 } else {
497                         utcp->nallocated *= 2;
498                 }
499
500                 struct utcp_connection **new_array = realloc(utcp->connections, utcp->nallocated * sizeof(*utcp->connections));
501
502                 if(!new_array) {
503                         return NULL;
504                 }
505
506                 utcp->connections = new_array;
507         }
508
509         struct utcp_connection *c = calloc(1, sizeof(*c));
510
511         if(!c) {
512                 return NULL;
513         }
514
515         if(!buffer_set_size(&c->sndbuf, DEFAULT_SNDBUFSIZE, DEFAULT_MAXSNDBUFSIZE)) {
516                 free(c);
517                 return NULL;
518         }
519
520         if(!buffer_set_size(&c->rcvbuf, DEFAULT_RCVBUFSIZE, DEFAULT_MAXRCVBUFSIZE)) {
521                 buffer_exit(&c->sndbuf);
522                 free(c);
523                 return NULL;
524         }
525
526         // Fill in the details
527
528         c->src = src;
529         c->dst = dst;
530 #ifdef UTCP_DEBUG
531         c->snd.iss = 0;
532 #else
533         c->snd.iss = rand();
534 #endif
535         c->snd.una = c->snd.iss;
536         c->snd.nxt = c->snd.iss + 1;
537         c->snd.last = c->snd.nxt;
538         c->snd.cwnd = (utcp->mss > 2190 ? 2 : utcp->mss > 1095 ? 3 : 4) * utcp->mss;
539         c->snd.ssthresh = ~0;
540         debug_cwnd(c);
541         c->srtt = 0;
542         c->rttvar = 0;
543         c->rto = START_RTO;
544         c->utcp = utcp;
545
546         // Add it to the sorted list of connections
547
548         utcp->connections[utcp->nconnections++] = c;
549         qsort(utcp->connections, utcp->nconnections, sizeof(*utcp->connections), compare);
550
551         return c;
552 }
553
554 static inline uint32_t absdiff(uint32_t a, uint32_t b) {
555         if(a > b) {
556                 return a - b;
557         } else {
558                 return b - a;
559         }
560 }
561
562 // Update RTT variables. See RFC 6298.
563 static void update_rtt(struct utcp_connection *c, uint32_t rtt) {
564         if(!rtt) {
565                 debug(c, "invalid rtt\n");
566                 return;
567         }
568
569         if(!c->srtt) {
570                 c->srtt = rtt;
571                 c->rttvar = rtt / 2;
572         } else {
573                 c->rttvar = (c->rttvar * 3 + absdiff(c->srtt, rtt)) / 4;
574                 c->srtt = (c->srtt * 7 + rtt) / 8;
575         }
576
577         c->rto = c->srtt + max(4 * c->rttvar, CLOCK_GRANULARITY);
578
579         if(c->rto > MAX_RTO) {
580                 c->rto = MAX_RTO;
581         }
582
583         debug(c, "rtt %u srtt %u rttvar %u rto %u\n", rtt, c->srtt, c->rttvar, c->rto);
584 }
585
586 static void start_retransmit_timer(struct utcp_connection *c) {
587         clock_gettime(UTCP_CLOCK, &c->rtrx_timeout);
588
589         uint32_t rto = c->rto;
590
591         while(rto > USEC_PER_SEC) {
592                 c->rtrx_timeout.tv_sec++;
593                 rto -= USEC_PER_SEC;
594         }
595
596         c->rtrx_timeout.tv_nsec += rto * 1000;
597
598         if(c->rtrx_timeout.tv_nsec >= NSEC_PER_SEC) {
599                 c->rtrx_timeout.tv_nsec -= NSEC_PER_SEC;
600                 c->rtrx_timeout.tv_sec++;
601         }
602
603         debug(c, "rtrx_timeout %ld.%06lu\n", c->rtrx_timeout.tv_sec, c->rtrx_timeout.tv_nsec);
604 }
605
606 static void start_flush_timer(struct utcp_connection *c) {
607         clock_gettime(UTCP_CLOCK, &c->rtrx_timeout);
608
609         c->rtrx_timeout.tv_nsec += c->utcp->flush_timeout * 1000000;
610
611         if(c->rtrx_timeout.tv_nsec >= NSEC_PER_SEC) {
612                 c->rtrx_timeout.tv_nsec -= NSEC_PER_SEC;
613                 c->rtrx_timeout.tv_sec++;
614         }
615
616         debug(c, "rtrx_timeout %ld.%06lu (flush)\n", c->rtrx_timeout.tv_sec, c->rtrx_timeout.tv_nsec);
617 }
618
619 static void stop_retransmit_timer(struct utcp_connection *c) {
620         timespec_clear(&c->rtrx_timeout);
621         debug(c, "rtrx_timeout cleared\n");
622 }
623
624 struct utcp_connection *utcp_connect_ex(struct utcp *utcp, uint16_t dst, utcp_recv_t recv, void *priv, uint32_t flags) {
625         struct utcp_connection *c = allocate_connection(utcp, 0, dst);
626
627         if(!c) {
628                 return NULL;
629         }
630
631         assert((flags & ~0x1f) == 0);
632
633         c->flags = flags;
634         c->recv = recv;
635         c->priv = priv;
636
637         struct {
638                 struct hdr hdr;
639                 uint8_t init[4];
640         } pkt;
641
642         pkt.hdr.src = c->src;
643         pkt.hdr.dst = c->dst;
644         pkt.hdr.seq = c->snd.iss;
645         pkt.hdr.ack = 0;
646         pkt.hdr.wnd = c->rcvbuf.maxsize;
647         pkt.hdr.ctl = SYN;
648         pkt.hdr.aux = 0x0101;
649         pkt.init[0] = 1;
650         pkt.init[1] = 0;
651         pkt.init[2] = 0;
652         pkt.init[3] = flags & 0x7;
653
654         set_state(c, SYN_SENT);
655
656         print_packet(c, "send", &pkt, sizeof(pkt));
657         utcp->send(utcp, &pkt, sizeof(pkt));
658
659         clock_gettime(UTCP_CLOCK, &c->conn_timeout);
660         c->conn_timeout.tv_sec += utcp->timeout;
661
662         start_retransmit_timer(c);
663
664         return c;
665 }
666
667 struct utcp_connection *utcp_connect(struct utcp *utcp, uint16_t dst, utcp_recv_t recv, void *priv) {
668         return utcp_connect_ex(utcp, dst, recv, priv, UTCP_TCP);
669 }
670
671 void utcp_accept(struct utcp_connection *c, utcp_recv_t recv, void *priv) {
672         if(c->reapable || c->state != SYN_RECEIVED) {
673                 debug(c, "accept() called on invalid connection in state %s\n", c, strstate[c->state]);
674                 return;
675         }
676
677         debug(c, "accepted %p %p\n", c, recv, priv);
678         c->recv = recv;
679         c->priv = priv;
680         c->do_poll = true;
681         set_state(c, ESTABLISHED);
682 }
683
684 static void ack(struct utcp_connection *c, bool sendatleastone) {
685         int32_t left = seqdiff(c->snd.last, c->snd.nxt);
686         int32_t cwndleft = is_reliable(c) ? min(c->snd.cwnd, c->snd.wnd) - seqdiff(c->snd.nxt, c->snd.una) : MAX_UNRELIABLE_SIZE;
687
688         assert(left >= 0);
689
690         if(cwndleft <= 0) {
691                 left = 0;
692         } else if(cwndleft < left) {
693                 left = cwndleft;
694
695                 if(!sendatleastone || cwndleft > c->utcp->mss) {
696                         left -= left % c->utcp->mss;
697                 }
698         }
699
700         debug(c, "cwndleft %d left %d\n", cwndleft, left);
701
702         if(!left && !sendatleastone) {
703                 return;
704         }
705
706         struct {
707                 struct hdr hdr;
708                 uint8_t data[];
709         } *pkt = c->utcp->pkt;
710
711         pkt->hdr.src = c->src;
712         pkt->hdr.dst = c->dst;
713         pkt->hdr.ack = c->rcv.nxt;
714         pkt->hdr.wnd = is_reliable(c) ? c->rcvbuf.maxsize : 0;
715         pkt->hdr.ctl = ACK;
716         pkt->hdr.aux = 0;
717
718         do {
719                 uint32_t seglen = left > c->utcp->mss ? c->utcp->mss : left;
720                 pkt->hdr.seq = c->snd.nxt;
721
722                 buffer_copy(&c->sndbuf, pkt->data, seqdiff(c->snd.nxt, c->snd.una), seglen);
723
724                 c->snd.nxt += seglen;
725                 left -= seglen;
726
727                 if(!is_reliable(c)) {
728                         if(left) {
729                                 pkt->hdr.ctl |= MF;
730                         } else {
731                                 pkt->hdr.ctl &= ~MF;
732                         }
733                 }
734
735                 if(seglen && fin_wanted(c, c->snd.nxt)) {
736                         seglen--;
737                         pkt->hdr.ctl |= FIN;
738                 }
739
740                 if(!c->rtt_start.tv_sec && is_reliable(c)) {
741                         // Start RTT measurement
742                         clock_gettime(UTCP_CLOCK, &c->rtt_start);
743                         c->rtt_seq = pkt->hdr.seq + seglen;
744                         debug(c, "starting RTT measurement, expecting ack %u\n", c->rtt_seq);
745                 }
746
747                 print_packet(c, "send", pkt, sizeof(pkt->hdr) + seglen);
748                 c->utcp->send(c->utcp, pkt, sizeof(pkt->hdr) + seglen);
749
750                 if(left && !is_reliable(c)) {
751                         pkt->hdr.wnd += seglen;
752                 }
753         } while(left);
754 }
755
756 static ssize_t utcp_send_reliable(struct utcp_connection *c, const void *data, size_t len) {
757         size_t rlen = len + (is_framed(c) ? 2 : 0);
758
759         if(!rlen) {
760                 return 0;
761         }
762
763         // Check if we need to be able to buffer all data
764
765         if(c->flags & (UTCP_NO_PARTIAL | UTCP_FRAMED)) {
766                 if(rlen > c->sndbuf.maxsize) {
767                         errno = EMSGSIZE;
768                         return -1;
769                 }
770
771                 if((c->flags & UTCP_FRAMED) && len > MAX_UNRELIABLE_SIZE) {
772                         errno = EMSGSIZE;
773                         return -1;
774                 }
775
776                 if(rlen > buffer_free(&c->sndbuf)) {
777                         errno = EWOULDBLOCK;
778                         return 0;
779                 }
780         }
781
782         // Add data to the send buffer.
783
784         if(is_framed(c)) {
785                 uint16_t len16 = len;
786                 buffer_put(&c->sndbuf, &len16, sizeof(len16));
787                 assert(buffer_put(&c->sndbuf, data, len) == (ssize_t)len);
788         } else {
789                 len = buffer_put(&c->sndbuf, data, len);
790
791                 if(len <= 0) {
792                         errno = EWOULDBLOCK;
793                         return 0;
794                 }
795         }
796
797         c->snd.last += rlen;
798
799         // Don't send anything yet if the connection has not fully established yet
800
801         if(c->state == SYN_SENT || c->state == SYN_RECEIVED) {
802                 return len;
803         }
804
805         ack(c, false);
806
807         if(!timespec_isset(&c->rtrx_timeout)) {
808                 start_retransmit_timer(c);
809         }
810
811         if(!timespec_isset(&c->conn_timeout)) {
812                 clock_gettime(UTCP_CLOCK, &c->conn_timeout);
813                 c->conn_timeout.tv_sec += c->utcp->timeout;
814         }
815
816         return len;
817 }
818
819
820 /* In the send buffer we can have multiple frames, each prefixed with their
821    length. However, the first frame might already have been partially sent. The
822    variable c->frame_offset tracks how much of a partial frame is left at the
823    start. If it is 0, it means there is no partial frame, and the first two
824    bytes in the send buffer are the length of the first frame.
825
826    After sending an MSS sized packet, we need to calculate the new frame_offset
827    value, since it is likely that the next packet will also have a partial frame
828    at the start. We do this by scanning the previously sent packet for frame
829    headers, to find out how many bytes of the last frame are left to send.
830 */
831 static void ack_unreliable_framed(struct utcp_connection *c) {
832         int32_t left = seqdiff(c->snd.last, c->snd.nxt);
833         assert(left > 0);
834
835         struct {
836                 struct hdr hdr;
837                 uint8_t data[];
838         } *pkt = c->utcp->pkt;
839
840         pkt->hdr.src = c->src;
841         pkt->hdr.dst = c->dst;
842         pkt->hdr.ack = c->rcv.nxt;
843         pkt->hdr.ctl = ACK | MF;
844         pkt->hdr.aux = 0;
845
846         bool sent_packet = false;
847
848         while(left >= c->utcp->mss) {
849                 pkt->hdr.wnd = c->frame_offset;
850                 uint32_t seglen = c->utcp->mss;
851
852                 pkt->hdr.seq = c->snd.nxt;
853
854                 buffer_copy(&c->sndbuf, pkt->data, seqdiff(c->snd.nxt, c->snd.una), seglen);
855
856                 c->snd.nxt += seglen;
857                 c->snd.una = c->snd.nxt;
858                 left -= seglen;
859
860                 print_packet(c, "send", pkt, sizeof(pkt->hdr) + seglen);
861                 c->utcp->send(c->utcp, pkt, sizeof(pkt->hdr) + seglen);
862                 sent_packet = true;
863
864                 // Calculate the new frame offset
865                 while(c->frame_offset < seglen) {
866                         uint16_t framelen;
867                         buffer_copy(&c->sndbuf, &framelen, c->frame_offset, sizeof(framelen));
868                         c->frame_offset += framelen + 2;
869                 }
870
871                 buffer_discard(&c->sndbuf, seglen);
872                 c->frame_offset -= seglen;
873         };
874
875         if(sent_packet) {
876                 if(left) {
877                         // We sent one packet but we have partial data left, (re)start the flush timer
878                         start_flush_timer(c);
879                 } else {
880                         // There is no partial data in the send buffer, so stop the flush timer
881                         stop_retransmit_timer(c);
882                 }
883         } else if(left && !timespec_isset(&c->rtrx_timeout)) {
884                 // We have partial data and we didn't start the flush timer yet
885                 start_flush_timer(c);
886         }
887 }
888
889 static void flush_unreliable_framed(struct utcp_connection *c) {
890         int32_t left = seqdiff(c->snd.last, c->snd.nxt);
891
892         /* If the MSS dropped since last time ack_unreliable_frame() was called,
893           we might now have more than one segment worth of data left.
894         */
895         if(left > c->utcp->mss) {
896                 ack_unreliable_framed(c);
897                 left = seqdiff(c->snd.last, c->snd.nxt);
898                 assert(left <= c->utcp->mss);
899         }
900
901         if(left) {
902                 struct {
903                         struct hdr hdr;
904                         uint8_t data[];
905                 } *pkt = c->utcp->pkt;
906
907                 pkt->hdr.src = c->src;
908                 pkt->hdr.dst = c->dst;
909                 pkt->hdr.seq = c->snd.nxt;
910                 pkt->hdr.ack = c->rcv.nxt;
911                 pkt->hdr.wnd = c->frame_offset;
912                 pkt->hdr.ctl = ACK | MF;
913                 pkt->hdr.aux = 0;
914
915                 uint32_t seglen = left;
916
917                 buffer_copy(&c->sndbuf, pkt->data, seqdiff(c->snd.nxt, c->snd.una), seglen);
918                 buffer_discard(&c->sndbuf, seglen);
919
920                 c->snd.nxt += seglen;
921                 c->snd.una = c->snd.nxt;
922
923                 print_packet(c, "send", pkt, sizeof(pkt->hdr) + seglen);
924                 c->utcp->send(c->utcp, pkt, sizeof(pkt->hdr) + seglen);
925         }
926
927         c->frame_offset = 0;
928         stop_retransmit_timer(c);
929 }
930
931
932 static ssize_t utcp_send_unreliable(struct utcp_connection *c, const void *data, size_t len) {
933         if(len > MAX_UNRELIABLE_SIZE) {
934                 errno = EMSGSIZE;
935                 return -1;
936         }
937
938         size_t rlen = len + (is_framed(c) ? 2 : 0);
939
940         if(rlen > buffer_free(&c->sndbuf)) {
941                 if(rlen > c->sndbuf.maxsize) {
942                         errno = EMSGSIZE;
943                         return -1;
944                 } else {
945                         errno = EWOULDBLOCK;
946                         return 0;
947                 }
948         }
949
950         // Don't send anything yet if the connection has not fully established yet
951
952         if(c->state == SYN_SENT || c->state == SYN_RECEIVED) {
953                 return len;
954         }
955
956         if(is_framed(c)) {
957                 uint16_t framelen = len;
958                 buffer_put(&c->sndbuf, &framelen, sizeof(framelen));
959         }
960
961         buffer_put(&c->sndbuf, data, len);
962
963         c->snd.last += rlen;
964
965         if(is_framed(c)) {
966                 ack_unreliable_framed(c);
967         } else {
968                 ack(c, false);
969                 c->snd.una = c->snd.nxt = c->snd.last;
970                 buffer_discard(&c->sndbuf, c->sndbuf.used);
971         }
972
973         return len;
974 }
975
976 ssize_t utcp_send(struct utcp_connection *c, const void *data, size_t len) {
977         if(c->reapable) {
978                 debug(c, "send() called on closed connection\n");
979                 errno = EBADF;
980                 return -1;
981         }
982
983         switch(c->state) {
984         case CLOSED:
985         case LISTEN:
986                 debug(c, "send() called on unconnected connection\n");
987                 errno = ENOTCONN;
988                 return -1;
989
990         case SYN_SENT:
991         case SYN_RECEIVED:
992         case ESTABLISHED:
993         case CLOSE_WAIT:
994                 break;
995
996         case FIN_WAIT_1:
997         case FIN_WAIT_2:
998         case CLOSING:
999         case LAST_ACK:
1000         case TIME_WAIT:
1001                 debug(c, "send() called on closed connection\n");
1002                 errno = EPIPE;
1003                 return -1;
1004         }
1005
1006         if(!data && len) {
1007                 errno = EFAULT;
1008                 return -1;
1009         }
1010
1011         if(is_reliable(c)) {
1012                 return utcp_send_reliable(c, data, len);
1013         } else {
1014                 return utcp_send_unreliable(c, data, len);
1015         }
1016 }
1017
1018 static void swap_ports(struct hdr *hdr) {
1019         uint16_t tmp = hdr->src;
1020         hdr->src = hdr->dst;
1021         hdr->dst = tmp;
1022 }
1023
1024 static void fast_retransmit(struct utcp_connection *c) {
1025         if(c->state == CLOSED || c->snd.last == c->snd.una) {
1026                 debug(c, "fast_retransmit() called but nothing to retransmit!\n");
1027                 return;
1028         }
1029
1030         struct utcp *utcp = c->utcp;
1031
1032         struct {
1033                 struct hdr hdr;
1034                 uint8_t data[];
1035         } *pkt = c->utcp->pkt;
1036
1037         pkt->hdr.src = c->src;
1038         pkt->hdr.dst = c->dst;
1039         pkt->hdr.wnd = c->rcvbuf.maxsize;
1040         pkt->hdr.aux = 0;
1041
1042         switch(c->state) {
1043         case ESTABLISHED:
1044         case FIN_WAIT_1:
1045         case CLOSE_WAIT:
1046         case CLOSING:
1047         case LAST_ACK:
1048                 // Send unacked data again.
1049                 pkt->hdr.seq = c->snd.una;
1050                 pkt->hdr.ack = c->rcv.nxt;
1051                 pkt->hdr.ctl = ACK;
1052                 uint32_t len = min(seqdiff(c->snd.last, c->snd.una), utcp->mss);
1053
1054                 if(fin_wanted(c, c->snd.una + len)) {
1055                         len--;
1056                         pkt->hdr.ctl |= FIN;
1057                 }
1058
1059                 buffer_copy(&c->sndbuf, pkt->data, 0, len);
1060                 print_packet(c, "rtrx", pkt, sizeof(pkt->hdr) + len);
1061                 utcp->send(utcp, pkt, sizeof(pkt->hdr) + len);
1062                 break;
1063
1064         default:
1065                 break;
1066         }
1067 }
1068
1069 static void retransmit(struct utcp_connection *c) {
1070         if(c->state == CLOSED || c->snd.last == c->snd.una) {
1071                 debug(c, "retransmit() called but nothing to retransmit!\n");
1072                 stop_retransmit_timer(c);
1073                 return;
1074         }
1075
1076         struct utcp *utcp = c->utcp;
1077
1078         if(utcp->retransmit && is_reliable(c)) {
1079                 utcp->retransmit(c);
1080         }
1081
1082         struct {
1083                 struct hdr hdr;
1084                 uint8_t data[];
1085         } *pkt = c->utcp->pkt;
1086
1087         pkt->hdr.src = c->src;
1088         pkt->hdr.dst = c->dst;
1089         pkt->hdr.wnd = c->rcvbuf.maxsize;
1090         pkt->hdr.aux = 0;
1091
1092         switch(c->state) {
1093         case SYN_SENT:
1094                 // Send our SYN again
1095                 pkt->hdr.seq = c->snd.iss;
1096                 pkt->hdr.ack = 0;
1097                 pkt->hdr.ctl = SYN;
1098                 pkt->hdr.aux = 0x0101;
1099                 pkt->data[0] = 1;
1100                 pkt->data[1] = 0;
1101                 pkt->data[2] = 0;
1102                 pkt->data[3] = c->flags & 0x7;
1103                 print_packet(c, "rtrx", pkt, sizeof(pkt->hdr) + 4);
1104                 utcp->send(utcp, pkt, sizeof(pkt->hdr) + 4);
1105                 break;
1106
1107         case SYN_RECEIVED:
1108                 // Send SYNACK again
1109                 pkt->hdr.seq = c->snd.nxt;
1110                 pkt->hdr.ack = c->rcv.nxt;
1111                 pkt->hdr.ctl = SYN | ACK;
1112                 print_packet(c, "rtrx", pkt, sizeof(pkt->hdr));
1113                 utcp->send(utcp, pkt, sizeof(pkt->hdr));
1114                 break;
1115
1116         case ESTABLISHED:
1117         case FIN_WAIT_1:
1118         case CLOSE_WAIT:
1119         case CLOSING:
1120         case LAST_ACK:
1121                 if(!is_reliable(c) && is_framed(c) && c->sndbuf.used) {
1122                         flush_unreliable_framed(c);
1123                         return;
1124                 }
1125
1126                 // Send unacked data again.
1127                 pkt->hdr.seq = c->snd.una;
1128                 pkt->hdr.ack = c->rcv.nxt;
1129                 pkt->hdr.ctl = ACK;
1130                 uint32_t len = min(seqdiff(c->snd.last, c->snd.una), utcp->mss);
1131
1132                 if(fin_wanted(c, c->snd.una + len)) {
1133                         len--;
1134                         pkt->hdr.ctl |= FIN;
1135                 }
1136
1137                 // RFC 5681 slow start after timeout
1138                 uint32_t flightsize = seqdiff(c->snd.nxt, c->snd.una);
1139                 c->snd.ssthresh = max(flightsize / 2, utcp->mss * 2); // eq. 4
1140                 c->snd.cwnd = utcp->mss;
1141                 debug_cwnd(c);
1142
1143                 buffer_copy(&c->sndbuf, pkt->data, 0, len);
1144                 print_packet(c, "rtrx", pkt, sizeof(pkt->hdr) + len);
1145                 utcp->send(utcp, pkt, sizeof(pkt->hdr) + len);
1146
1147                 c->snd.nxt = c->snd.una + len;
1148                 break;
1149
1150         case CLOSED:
1151         case LISTEN:
1152         case TIME_WAIT:
1153         case FIN_WAIT_2:
1154                 // We shouldn't need to retransmit anything in this state.
1155 #ifdef UTCP_DEBUG
1156                 abort();
1157 #endif
1158                 stop_retransmit_timer(c);
1159                 goto cleanup;
1160         }
1161
1162         start_retransmit_timer(c);
1163         c->rto *= 2;
1164
1165         if(c->rto > MAX_RTO) {
1166                 c->rto = MAX_RTO;
1167         }
1168
1169         c->rtt_start.tv_sec = 0; // invalidate RTT timer
1170         c->dupack = 0; // cancel any ongoing fast recovery
1171
1172 cleanup:
1173         return;
1174 }
1175
1176 /* Update receive buffer and SACK entries after consuming data.
1177  *
1178  * Situation:
1179  *
1180  * |.....0000..1111111111.....22222......3333|
1181  * |---------------^
1182  *
1183  * 0..3 represent the SACK entries. The ^ indicates up to which point we want
1184  * to remove data from the receive buffer. The idea is to substract "len"
1185  * from the offset of all the SACK entries, and then remove/cut down entries
1186  * that are shifted to before the start of the receive buffer.
1187  *
1188  * There are three cases:
1189  * - the SACK entry is after ^, in that case just change the offset.
1190  * - the SACK entry starts before and ends after ^, so we have to
1191  *   change both its offset and size.
1192  * - the SACK entry is completely before ^, in that case delete it.
1193  */
1194 static void sack_consume(struct utcp_connection *c, size_t len) {
1195         debug(c, "sack_consume %lu\n", (unsigned long)len);
1196
1197         if(len > c->rcvbuf.used) {
1198                 debug(c, "all SACK entries consumed\n");
1199                 c->sacks[0].len = 0;
1200                 return;
1201         }
1202
1203         buffer_discard(&c->rcvbuf, len);
1204
1205         for(int i = 0; i < NSACKS && c->sacks[i].len;) {
1206                 if(len < c->sacks[i].offset) {
1207                         c->sacks[i].offset -= len;
1208                         i++;
1209                 } else if(len < c->sacks[i].offset + c->sacks[i].len) {
1210                         c->sacks[i].len -= len - c->sacks[i].offset;
1211                         c->sacks[i].offset = 0;
1212                         i++;
1213                 } else {
1214                         if(i < NSACKS - 1) {
1215                                 memmove(&c->sacks[i], &c->sacks[i + 1], (NSACKS - 1 - i) * sizeof(c->sacks)[i]);
1216                                 c->sacks[NSACKS - 1].len = 0;
1217                         } else {
1218                                 c->sacks[i].len = 0;
1219                                 break;
1220                         }
1221                 }
1222         }
1223
1224         for(int i = 0; i < NSACKS && c->sacks[i].len; i++) {
1225                 debug(c, "SACK[%d] offset %u len %u\n", i, c->sacks[i].offset, c->sacks[i].len);
1226         }
1227 }
1228
1229 static void handle_out_of_order(struct utcp_connection *c, uint32_t offset, const void *data, size_t len) {
1230         debug(c, "out of order packet, offset %u\n", offset);
1231         // Packet loss or reordering occured. Store the data in the buffer.
1232         ssize_t rxd = buffer_put_at(&c->rcvbuf, offset, data, len);
1233
1234         if(rxd <= 0) {
1235                 debug(c, "packet outside receive buffer, dropping\n");
1236                 return;
1237         }
1238
1239         if((size_t)rxd < len) {
1240                 debug(c, "packet partially outside receive buffer\n");
1241                 len = rxd;
1242         }
1243
1244         // Make note of where we put it.
1245         for(int i = 0; i < NSACKS; i++) {
1246                 if(!c->sacks[i].len) { // nothing to merge, add new entry
1247                         debug(c, "new SACK entry %d\n", i);
1248                         c->sacks[i].offset = offset;
1249                         c->sacks[i].len = rxd;
1250                         break;
1251                 } else if(offset < c->sacks[i].offset) {
1252                         if(offset + rxd < c->sacks[i].offset) { // insert before
1253                                 if(!c->sacks[NSACKS - 1].len) { // only if room left
1254                                         debug(c, "insert SACK entry at %d\n", i);
1255                                         memmove(&c->sacks[i + 1], &c->sacks[i], (NSACKS - i - 1) * sizeof(c->sacks)[i]);
1256                                         c->sacks[i].offset = offset;
1257                                         c->sacks[i].len = rxd;
1258                                 } else {
1259                                         debug(c, "SACK entries full, dropping packet\n");
1260                                 }
1261
1262                                 break;
1263                         } else { // merge
1264                                 debug(c, "merge with start of SACK entry at %d\n", i);
1265                                 c->sacks[i].offset = offset;
1266                                 break;
1267                         }
1268                 } else if(offset <= c->sacks[i].offset + c->sacks[i].len) {
1269                         if(offset + rxd > c->sacks[i].offset + c->sacks[i].len) { // merge
1270                                 debug(c, "merge with end of SACK entry at %d\n", i);
1271                                 c->sacks[i].len = offset + rxd - c->sacks[i].offset;
1272                                 // TODO: handle potential merge with next entry
1273                         }
1274
1275                         break;
1276                 }
1277         }
1278
1279         for(int i = 0; i < NSACKS && c->sacks[i].len; i++) {
1280                 debug(c, "SACK[%d] offset %u len %u\n", i, c->sacks[i].offset, c->sacks[i].len);
1281         }
1282 }
1283
1284 static void handle_out_of_order_framed(struct utcp_connection *c, uint32_t offset, const void *data, size_t len) {
1285         uint32_t in_order_offset = (c->sacks[0].len && c->sacks[0].offset == 0) ? c->sacks[0].len : 0;
1286
1287         // Put the data into the receive buffer
1288         handle_out_of_order(c, offset + in_order_offset, data, len);
1289 }
1290
1291 static void handle_in_order(struct utcp_connection *c, const void *data, size_t len) {
1292         if(c->recv) {
1293                 ssize_t rxd = c->recv(c, data, len);
1294
1295                 if(rxd != (ssize_t)len) {
1296                         // TODO: handle the application not accepting all data.
1297                         abort();
1298                 }
1299         }
1300
1301         // Check if we can process out-of-order data now.
1302         if(c->sacks[0].len && len >= c->sacks[0].offset) {
1303                 debug(c, "incoming packet len %lu connected with SACK at %u\n", (unsigned long)len, c->sacks[0].offset);
1304
1305                 if(len < c->sacks[0].offset + c->sacks[0].len) {
1306                         size_t offset = len;
1307                         len = c->sacks[0].offset + c->sacks[0].len;
1308                         size_t remainder = len - offset;
1309
1310                         ssize_t rxd = buffer_call(c, &c->rcvbuf, offset, remainder);
1311
1312                         if(rxd != (ssize_t)remainder) {
1313                                 // TODO: handle the application not accepting all data.
1314                                 abort();
1315                         }
1316                 }
1317         }
1318
1319         if(c->rcvbuf.used) {
1320                 sack_consume(c, len);
1321         }
1322
1323         c->rcv.nxt += len;
1324 }
1325
1326 static void handle_in_order_framed(struct utcp_connection *c, const void *data, size_t len) {
1327         // Treat it as out of order, since it is unlikely the start of this packet contains the start of a frame.
1328         uint32_t in_order_offset = (c->sacks[0].len && c->sacks[0].offset == 0) ? c->sacks[0].len : 0;
1329         handle_out_of_order(c, in_order_offset, data, len);
1330
1331         // While we have full frames at the start, give them to the application
1332         while(c->sacks[0].len >= 2 && c->sacks[0].offset == 0) {
1333                 uint16_t framelen;
1334                 buffer_copy(&c->rcvbuf, &framelen, 0, sizeof(&framelen));
1335
1336                 if(framelen > c->sacks[0].len - 2) {
1337                         break;
1338                 }
1339
1340                 if(c->recv) {
1341                         ssize_t rxd;
1342                         uint32_t realoffset = c->rcvbuf.offset + 2;
1343
1344                         if(c->rcvbuf.size - c->rcvbuf.offset <= 2) {
1345                                 realoffset -= c->rcvbuf.size;
1346                         }
1347
1348                         if(realoffset > c->rcvbuf.size - framelen) {
1349                                 // The buffer wraps, we need to copy
1350                                 uint8_t buf[framelen];
1351                                 buffer_copy(&c->rcvbuf, buf, 2, framelen);
1352                                 rxd = c->recv(c, buf, framelen);
1353                         } else {
1354                                 // The frame is contiguous in the receive buffer
1355                                 rxd = c->recv(c, c->rcvbuf.data + realoffset, framelen);
1356                         }
1357
1358                         if(rxd != (ssize_t)framelen) {
1359                                 // TODO: handle the application not accepting all data.
1360                                 abort();
1361                         }
1362                 }
1363
1364                 sack_consume(c, framelen + 2);
1365         }
1366
1367         c->rcv.nxt += len;
1368 }
1369
1370 static void handle_unreliable(struct utcp_connection *c, const struct hdr *hdr, const void *data, size_t len) {
1371         // Fast path for unfragmented packets
1372         if(!hdr->wnd && !(hdr->ctl & MF)) {
1373                 if(c->recv) {
1374                         c->recv(c, data, len);
1375                 }
1376
1377                 c->rcv.nxt = hdr->seq + len;
1378                 return;
1379         }
1380
1381         // Ensure reassembled packet are not larger than 64 kiB
1382         if(hdr->wnd > MAX_UNRELIABLE_SIZE || hdr->wnd + len > MAX_UNRELIABLE_SIZE) {
1383                 return;
1384         }
1385
1386         // Don't accept out of order fragments
1387         if(hdr->wnd && hdr->seq != c->rcv.nxt) {
1388                 return;
1389         }
1390
1391         // Reset the receive buffer for the first fragment
1392         if(!hdr->wnd) {
1393                 buffer_clear(&c->rcvbuf);
1394         }
1395
1396         ssize_t rxd = buffer_put_at(&c->rcvbuf, hdr->wnd, data, len);
1397
1398         if(rxd != (ssize_t)len) {
1399                 return;
1400         }
1401
1402         // Send the packet if it's the final fragment
1403         if(!(hdr->ctl & MF)) {
1404                 buffer_call(c, &c->rcvbuf, 0, hdr->wnd + len);
1405         }
1406
1407         c->rcv.nxt = hdr->seq + len;
1408 }
1409
1410 static void handle_unreliable_framed(struct utcp_connection *c, const struct hdr *hdr, const void *data, size_t len) {
1411         bool in_order = hdr->seq == c->rcv.nxt;
1412         c->rcv.nxt = hdr->seq + len;
1413
1414         const uint8_t *ptr = data;
1415         size_t left = len;
1416
1417         // Does it start with a partial frame?
1418         if(hdr->wnd) {
1419                 // Only accept the data if it is in order
1420                 if(in_order && c->rcvbuf.used) {
1421                         // In order, append it to the receive buffer
1422                         buffer_put(&c->rcvbuf, data, min(hdr->wnd, len));
1423
1424                         if(hdr->wnd <= len) {
1425                                 // We have a full frame
1426                                 c->recv(c, (uint8_t *)c->rcvbuf.data + 2, c->rcvbuf.used - 2);
1427                         }
1428                 }
1429
1430                 // Exit early if there is other data in this frame
1431                 if(hdr->wnd > len) {
1432                         if(!in_order) {
1433                                 buffer_clear(&c->rcvbuf);
1434                         }
1435
1436                         return;
1437                 }
1438
1439                 ptr += hdr->wnd;
1440                 left -= hdr->wnd;
1441         }
1442
1443         // We now start with new frames, so clear any data in the receive buffer
1444         buffer_clear(&c->rcvbuf);
1445
1446         // Handle whole frames
1447         while(left > 2) {
1448                 uint16_t framelen;
1449                 memcpy(&framelen, ptr, sizeof(framelen));
1450
1451                 if(left <= (size_t)framelen + 2) {
1452                         break;
1453                 }
1454
1455                 c->recv(c, ptr + 2, framelen);
1456                 ptr += framelen + 2;
1457                 left -= framelen + 2;
1458         }
1459
1460         // Handle partial last frame
1461         if(left) {
1462                 buffer_put(&c->rcvbuf, ptr, left);
1463         }
1464 }
1465
1466 static void handle_incoming_data(struct utcp_connection *c, const struct hdr *hdr, const void *data, size_t len) {
1467         if(!is_reliable(c)) {
1468                 if(is_framed(c)) {
1469                         handle_unreliable_framed(c, hdr, data, len);
1470                 } else {
1471                         handle_unreliable(c, hdr, data, len);
1472                 }
1473
1474                 return;
1475         }
1476
1477         uint32_t offset = seqdiff(hdr->seq, c->rcv.nxt);
1478
1479         if(is_framed(c)) {
1480                 if(offset) {
1481                         handle_out_of_order_framed(c, offset, data, len);
1482                 } else {
1483                         handle_in_order_framed(c, data, len);
1484                 }
1485         } else {
1486                 if(offset) {
1487                         handle_out_of_order(c, offset, data, len);
1488                 } else {
1489                         handle_in_order(c, data, len);
1490                 }
1491         }
1492 }
1493
1494
1495 ssize_t utcp_recv(struct utcp *utcp, const void *data, size_t len) {
1496         const uint8_t *ptr = data;
1497
1498         if(!utcp) {
1499                 errno = EFAULT;
1500                 return -1;
1501         }
1502
1503         if(!len) {
1504                 return 0;
1505         }
1506
1507         if(!data) {
1508                 errno = EFAULT;
1509                 return -1;
1510         }
1511
1512         // Drop packets smaller than the header
1513
1514         struct hdr hdr;
1515
1516         if(len < sizeof(hdr)) {
1517                 print_packet(NULL, "recv", data, len);
1518                 errno = EBADMSG;
1519                 return -1;
1520         }
1521
1522         // Make a copy from the potentially unaligned data to a struct hdr
1523
1524         memcpy(&hdr, ptr, sizeof(hdr));
1525
1526         // Try to match the packet to an existing connection
1527
1528         struct utcp_connection *c = find_connection(utcp, hdr.dst, hdr.src);
1529         print_packet(c, "recv", data, len);
1530
1531         // Process the header
1532
1533         ptr += sizeof(hdr);
1534         len -= sizeof(hdr);
1535
1536         // Drop packets with an unknown CTL flag
1537
1538         if(hdr.ctl & ~(SYN | ACK | RST | FIN | MF)) {
1539                 print_packet(NULL, "recv", data, len);
1540                 errno = EBADMSG;
1541                 return -1;
1542         }
1543
1544         // Check for auxiliary headers
1545
1546         const uint8_t *init = NULL;
1547
1548         uint16_t aux = hdr.aux;
1549
1550         while(aux) {
1551                 size_t auxlen = 4 * (aux >> 8) & 0xf;
1552                 uint8_t auxtype = aux & 0xff;
1553
1554                 if(len < auxlen) {
1555                         errno = EBADMSG;
1556                         return -1;
1557                 }
1558
1559                 switch(auxtype) {
1560                 case AUX_INIT:
1561                         if(!(hdr.ctl & SYN) || auxlen != 4) {
1562                                 errno = EBADMSG;
1563                                 return -1;
1564                         }
1565
1566                         init = ptr;
1567                         break;
1568
1569                 default:
1570                         errno = EBADMSG;
1571                         return -1;
1572                 }
1573
1574                 len -= auxlen;
1575                 ptr += auxlen;
1576
1577                 if(!(aux & 0x800)) {
1578                         break;
1579                 }
1580
1581                 if(len < 2) {
1582                         errno = EBADMSG;
1583                         return -1;
1584                 }
1585
1586                 memcpy(&aux, ptr, 2);
1587                 len -= 2;
1588                 ptr += 2;
1589         }
1590
1591         bool has_data = len || (hdr.ctl & (SYN | FIN));
1592
1593         // Is it for a new connection?
1594
1595         if(!c) {
1596                 // Ignore RST packets
1597
1598                 if(hdr.ctl & RST) {
1599                         return 0;
1600                 }
1601
1602                 // Is it a SYN packet and are we LISTENing?
1603
1604                 if(hdr.ctl & SYN && !(hdr.ctl & ACK) && utcp->accept) {
1605                         // If we don't want to accept it, send a RST back
1606                         if((utcp->pre_accept && !utcp->pre_accept(utcp, hdr.dst))) {
1607                                 len = 1;
1608                                 goto reset;
1609                         }
1610
1611                         // Try to allocate memory, otherwise send a RST back
1612                         c = allocate_connection(utcp, hdr.dst, hdr.src);
1613
1614                         if(!c) {
1615                                 len = 1;
1616                                 goto reset;
1617                         }
1618
1619                         // Parse auxilliary information
1620                         if(init) {
1621                                 if(init[0] < 1) {
1622                                         len = 1;
1623                                         goto reset;
1624                                 }
1625
1626                                 c->flags = init[3] & 0x7;
1627                         } else {
1628                                 c->flags = UTCP_TCP;
1629                         }
1630
1631 synack:
1632                         // Return SYN+ACK, go to SYN_RECEIVED state
1633                         c->snd.wnd = hdr.wnd;
1634                         c->rcv.irs = hdr.seq;
1635                         c->rcv.nxt = c->rcv.irs + 1;
1636                         set_state(c, SYN_RECEIVED);
1637
1638                         struct {
1639                                 struct hdr hdr;
1640                                 uint8_t data[4];
1641                         } pkt;
1642
1643                         pkt.hdr.src = c->src;
1644                         pkt.hdr.dst = c->dst;
1645                         pkt.hdr.ack = c->rcv.irs + 1;
1646                         pkt.hdr.seq = c->snd.iss;
1647                         pkt.hdr.wnd = c->rcvbuf.maxsize;
1648                         pkt.hdr.ctl = SYN | ACK;
1649
1650                         if(init) {
1651                                 pkt.hdr.aux = 0x0101;
1652                                 pkt.data[0] = 1;
1653                                 pkt.data[1] = 0;
1654                                 pkt.data[2] = 0;
1655                                 pkt.data[3] = c->flags & 0x7;
1656                                 print_packet(c, "send", &pkt, sizeof(hdr) + 4);
1657                                 utcp->send(utcp, &pkt, sizeof(hdr) + 4);
1658                         } else {
1659                                 pkt.hdr.aux = 0;
1660                                 print_packet(c, "send", &pkt, sizeof(hdr));
1661                                 utcp->send(utcp, &pkt, sizeof(hdr));
1662                         }
1663
1664                         start_retransmit_timer(c);
1665                 } else {
1666                         // No, we don't want your packets, send a RST back
1667                         len = 1;
1668                         goto reset;
1669                 }
1670
1671                 return 0;
1672         }
1673
1674         debug(c, "state %s\n", strstate[c->state]);
1675
1676         // In case this is for a CLOSED connection, ignore the packet.
1677         // TODO: make it so incoming packets can never match a CLOSED connection.
1678
1679         if(c->state == CLOSED) {
1680                 debug(c, "got packet for closed connection\n");
1681                 return 0;
1682         }
1683
1684         // It is for an existing connection.
1685
1686         // 1. Drop invalid packets.
1687
1688         // 1a. Drop packets that should not happen in our current state.
1689
1690         switch(c->state) {
1691         case SYN_SENT:
1692         case SYN_RECEIVED:
1693         case ESTABLISHED:
1694         case FIN_WAIT_1:
1695         case FIN_WAIT_2:
1696         case CLOSE_WAIT:
1697         case CLOSING:
1698         case LAST_ACK:
1699         case TIME_WAIT:
1700                 break;
1701
1702         default:
1703 #ifdef UTCP_DEBUG
1704                 abort();
1705 #endif
1706                 break;
1707         }
1708
1709         // 1b. Discard data that is not in our receive window.
1710
1711         if(is_reliable(c)) {
1712                 bool acceptable;
1713
1714                 if(c->state == SYN_SENT) {
1715                         acceptable = true;
1716                 } else if(len == 0) {
1717                         acceptable = seqdiff(hdr.seq, c->rcv.nxt) >= 0;
1718                 } else {
1719                         int32_t rcv_offset = seqdiff(hdr.seq, c->rcv.nxt);
1720
1721                         // cut already accepted front overlapping
1722                         if(rcv_offset < 0) {
1723                                 acceptable = len > (size_t) - rcv_offset;
1724
1725                                 if(acceptable) {
1726                                         ptr -= rcv_offset;
1727                                         len += rcv_offset;
1728                                         hdr.seq -= rcv_offset;
1729                                 }
1730                         } else {
1731                                 acceptable = seqdiff(hdr.seq, c->rcv.nxt) >= 0 && seqdiff(hdr.seq, c->rcv.nxt) + len <= c->rcvbuf.maxsize;
1732                         }
1733                 }
1734
1735                 if(!acceptable) {
1736                         debug(c, "packet not acceptable, %u <= %u + %lu < %u\n", c->rcv.nxt, hdr.seq, (unsigned long)len, c->rcv.nxt + c->rcvbuf.maxsize);
1737
1738                         // Ignore unacceptable RST packets.
1739                         if(hdr.ctl & RST) {
1740                                 return 0;
1741                         }
1742
1743                         // Otherwise, continue processing.
1744                         len = 0;
1745                 }
1746         } else {
1747 #if UTCP_DEBUG
1748                 int32_t rcv_offset = seqdiff(hdr.seq, c->rcv.nxt);
1749
1750                 if(rcv_offset) {
1751                         debug(c, "packet out of order, offset %u bytes\n", rcv_offset);
1752                 }
1753
1754 #endif
1755         }
1756
1757         c->snd.wnd = hdr.wnd; // TODO: move below
1758
1759         // 1c. Drop packets with an invalid ACK.
1760         // ackno should not roll back, and it should also not be bigger than what we ever could have sent
1761         // (= snd.una + c->sndbuf.used).
1762
1763         if(!is_reliable(c)) {
1764                 if(hdr.ack != c->snd.last && c->state >= ESTABLISHED) {
1765                         hdr.ack = c->snd.una;
1766                 }
1767         }
1768
1769         if(hdr.ctl & ACK && (seqdiff(hdr.ack, c->snd.last) > 0 || seqdiff(hdr.ack, c->snd.una) < 0)) {
1770                 debug(c, "packet ack seqno out of range, %u <= %u < %u\n", c->snd.una, hdr.ack, c->snd.una + c->sndbuf.used);
1771
1772                 // Ignore unacceptable RST packets.
1773                 if(hdr.ctl & RST) {
1774                         return 0;
1775                 }
1776
1777                 goto reset;
1778         }
1779
1780         // 2. Handle RST packets
1781
1782         if(hdr.ctl & RST) {
1783                 switch(c->state) {
1784                 case SYN_SENT:
1785                         if(!(hdr.ctl & ACK)) {
1786                                 return 0;
1787                         }
1788
1789                         // The peer has refused our connection.
1790                         set_state(c, CLOSED);
1791                         errno = ECONNREFUSED;
1792
1793                         if(c->recv) {
1794                                 c->recv(c, NULL, 0);
1795                         }
1796
1797                         if(c->poll && !c->reapable) {
1798                                 c->poll(c, 0);
1799                         }
1800
1801                         return 0;
1802
1803                 case SYN_RECEIVED:
1804                         if(hdr.ctl & ACK) {
1805                                 return 0;
1806                         }
1807
1808                         // We haven't told the application about this connection yet. Silently delete.
1809                         free_connection(c);
1810                         return 0;
1811
1812                 case ESTABLISHED:
1813                 case FIN_WAIT_1:
1814                 case FIN_WAIT_2:
1815                 case CLOSE_WAIT:
1816                         if(hdr.ctl & ACK) {
1817                                 return 0;
1818                         }
1819
1820                         // The peer has aborted our connection.
1821                         set_state(c, CLOSED);
1822                         errno = ECONNRESET;
1823
1824                         if(c->recv) {
1825                                 c->recv(c, NULL, 0);
1826                         }
1827
1828                         if(c->poll && !c->reapable) {
1829                                 c->poll(c, 0);
1830                         }
1831
1832                         return 0;
1833
1834                 case CLOSING:
1835                 case LAST_ACK:
1836                 case TIME_WAIT:
1837                         if(hdr.ctl & ACK) {
1838                                 return 0;
1839                         }
1840
1841                         // As far as the application is concerned, the connection has already been closed.
1842                         // If it has called utcp_close() already, we can immediately free this connection.
1843                         if(c->reapable) {
1844                                 free_connection(c);
1845                                 return 0;
1846                         }
1847
1848                         // Otherwise, immediately move to the CLOSED state.
1849                         set_state(c, CLOSED);
1850                         return 0;
1851
1852                 default:
1853 #ifdef UTCP_DEBUG
1854                         abort();
1855 #endif
1856                         break;
1857                 }
1858         }
1859
1860         uint32_t advanced;
1861
1862         if(!(hdr.ctl & ACK)) {
1863                 advanced = 0;
1864                 goto skip_ack;
1865         }
1866
1867         // 3. Advance snd.una
1868
1869         advanced = seqdiff(hdr.ack, c->snd.una);
1870
1871         if(advanced) {
1872                 // RTT measurement
1873                 if(c->rtt_start.tv_sec) {
1874                         if(c->rtt_seq == hdr.ack) {
1875                                 struct timespec now;
1876                                 clock_gettime(UTCP_CLOCK, &now);
1877                                 int32_t diff = timespec_diff_usec(&now, &c->rtt_start);
1878                                 update_rtt(c, diff);
1879                                 c->rtt_start.tv_sec = 0;
1880                         } else if(c->rtt_seq < hdr.ack) {
1881                                 debug(c, "cancelling RTT measurement: %u < %u\n", c->rtt_seq, hdr.ack);
1882                                 c->rtt_start.tv_sec = 0;
1883                         }
1884                 }
1885
1886                 int32_t data_acked = advanced;
1887
1888                 switch(c->state) {
1889                 case SYN_SENT:
1890                 case SYN_RECEIVED:
1891                         data_acked--;
1892                         break;
1893
1894                 // TODO: handle FIN as well.
1895                 default:
1896                         break;
1897                 }
1898
1899                 assert(data_acked >= 0);
1900
1901 #ifndef NDEBUG
1902                 int32_t bufused = seqdiff(c->snd.last, c->snd.una);
1903                 assert(data_acked <= bufused);
1904 #endif
1905
1906                 if(data_acked) {
1907                         buffer_discard(&c->sndbuf, data_acked);
1908
1909                         if(is_reliable(c)) {
1910                                 c->do_poll = true;
1911                         }
1912                 }
1913
1914                 // Also advance snd.nxt if possible
1915                 if(seqdiff(c->snd.nxt, hdr.ack) < 0) {
1916                         c->snd.nxt = hdr.ack;
1917                 }
1918
1919                 c->snd.una = hdr.ack;
1920
1921                 if(c->dupack) {
1922                         if(c->dupack >= 3) {
1923                                 debug(c, "fast recovery ended\n");
1924                                 c->snd.cwnd = c->snd.ssthresh;
1925                         }
1926
1927                         c->dupack = 0;
1928                 }
1929
1930                 // Increase the congestion window according to RFC 5681
1931                 if(c->snd.cwnd < c->snd.ssthresh) {
1932                         c->snd.cwnd += min(advanced, utcp->mss); // eq. 2
1933                 } else {
1934                         c->snd.cwnd += max(1, (utcp->mss * utcp->mss) / c->snd.cwnd); // eq. 3
1935                 }
1936
1937                 if(c->snd.cwnd > c->sndbuf.maxsize) {
1938                         c->snd.cwnd = c->sndbuf.maxsize;
1939                 }
1940
1941                 debug_cwnd(c);
1942
1943                 // Check if we have sent a FIN that is now ACKed.
1944                 switch(c->state) {
1945                 case FIN_WAIT_1:
1946                         if(c->snd.una == c->snd.last) {
1947                                 set_state(c, FIN_WAIT_2);
1948                         }
1949
1950                         break;
1951
1952                 case CLOSING:
1953                         if(c->snd.una == c->snd.last) {
1954                                 clock_gettime(UTCP_CLOCK, &c->conn_timeout);
1955                                 c->conn_timeout.tv_sec += utcp->timeout;
1956                                 set_state(c, TIME_WAIT);
1957                         }
1958
1959                         break;
1960
1961                 default:
1962                         break;
1963                 }
1964         } else {
1965                 if(!len && is_reliable(c) && c->snd.una != c->snd.last) {
1966                         c->dupack++;
1967                         debug(c, "duplicate ACK %d\n", c->dupack);
1968
1969                         if(c->dupack == 3) {
1970                                 // RFC 5681 fast recovery
1971                                 debug(c, "fast recovery started\n", c->dupack);
1972                                 uint32_t flightsize = seqdiff(c->snd.nxt, c->snd.una);
1973                                 c->snd.ssthresh = max(flightsize / 2, utcp->mss * 2); // eq. 4
1974                                 c->snd.cwnd = min(c->snd.ssthresh + 3 * utcp->mss, c->sndbuf.maxsize);
1975
1976                                 if(c->snd.cwnd > c->sndbuf.maxsize) {
1977                                         c->snd.cwnd = c->sndbuf.maxsize;
1978                                 }
1979
1980                                 debug_cwnd(c);
1981
1982                                 fast_retransmit(c);
1983                         } else if(c->dupack > 3) {
1984                                 c->snd.cwnd += utcp->mss;
1985
1986                                 if(c->snd.cwnd > c->sndbuf.maxsize) {
1987                                         c->snd.cwnd = c->sndbuf.maxsize;
1988                                 }
1989
1990                                 debug_cwnd(c);
1991                         }
1992
1993                         // We got an ACK which indicates the other side did get one of our packets.
1994                         // Reset the retransmission timer to avoid going to slow start,
1995                         // but don't touch the connection timeout.
1996                         start_retransmit_timer(c);
1997                 }
1998         }
1999
2000         // 4. Update timers
2001
2002         if(advanced) {
2003                 if(c->snd.una == c->snd.last) {
2004                         stop_retransmit_timer(c);
2005                         timespec_clear(&c->conn_timeout);
2006                 } else if(is_reliable(c)) {
2007                         start_retransmit_timer(c);
2008                         clock_gettime(UTCP_CLOCK, &c->conn_timeout);
2009                         c->conn_timeout.tv_sec += utcp->timeout;
2010                 }
2011         }
2012
2013 skip_ack:
2014         // 5. Process SYN stuff
2015
2016         if(hdr.ctl & SYN) {
2017                 switch(c->state) {
2018                 case SYN_SENT:
2019
2020                         // This is a SYNACK. It should always have ACKed the SYN.
2021                         if(!advanced) {
2022                                 goto reset;
2023                         }
2024
2025                         c->rcv.irs = hdr.seq;
2026                         c->rcv.nxt = hdr.seq + 1;
2027
2028                         if(c->shut_wr) {
2029                                 c->snd.last++;
2030                                 set_state(c, FIN_WAIT_1);
2031                         } else {
2032                                 c->do_poll = true;
2033                                 set_state(c, ESTABLISHED);
2034                         }
2035
2036                         break;
2037
2038                 case SYN_RECEIVED:
2039                         // This is a retransmit of a SYN, send back the SYNACK.
2040                         goto synack;
2041
2042                 case ESTABLISHED:
2043                 case FIN_WAIT_1:
2044                 case FIN_WAIT_2:
2045                 case CLOSE_WAIT:
2046                 case CLOSING:
2047                 case LAST_ACK:
2048                 case TIME_WAIT:
2049                         // This could be a retransmission. Ignore the SYN flag, but send an ACK back.
2050                         break;
2051
2052                 default:
2053 #ifdef UTCP_DEBUG
2054                         abort();
2055 #endif
2056                         return 0;
2057                 }
2058         }
2059
2060         // 6. Process new data
2061
2062         if(c->state == SYN_RECEIVED) {
2063                 // This is the ACK after the SYNACK. It should always have ACKed the SYNACK.
2064                 if(!advanced) {
2065                         goto reset;
2066                 }
2067
2068                 // Are we still LISTENing?
2069                 if(utcp->accept) {
2070                         utcp->accept(c, c->src);
2071                 }
2072
2073                 if(c->state != ESTABLISHED) {
2074                         set_state(c, CLOSED);
2075                         c->reapable = true;
2076                         goto reset;
2077                 }
2078         }
2079
2080         if(len) {
2081                 switch(c->state) {
2082                 case SYN_SENT:
2083                 case SYN_RECEIVED:
2084                         // This should never happen.
2085 #ifdef UTCP_DEBUG
2086                         abort();
2087 #endif
2088                         return 0;
2089
2090                 case ESTABLISHED:
2091                 case FIN_WAIT_1:
2092                 case FIN_WAIT_2:
2093                         break;
2094
2095                 case CLOSE_WAIT:
2096                 case CLOSING:
2097                 case LAST_ACK:
2098                 case TIME_WAIT:
2099                         // Ehm no, We should never receive more data after a FIN.
2100                         goto reset;
2101
2102                 default:
2103 #ifdef UTCP_DEBUG
2104                         abort();
2105 #endif
2106                         return 0;
2107                 }
2108
2109                 handle_incoming_data(c, &hdr, ptr, len);
2110         }
2111
2112         // 7. Process FIN stuff
2113
2114         if((hdr.ctl & FIN) && (!is_reliable(c) || hdr.seq + len == c->rcv.nxt)) {
2115                 switch(c->state) {
2116                 case SYN_SENT:
2117                 case SYN_RECEIVED:
2118                         // This should never happen.
2119 #ifdef UTCP_DEBUG
2120                         abort();
2121 #endif
2122                         break;
2123
2124                 case ESTABLISHED:
2125                         set_state(c, CLOSE_WAIT);
2126                         break;
2127
2128                 case FIN_WAIT_1:
2129                         set_state(c, CLOSING);
2130                         break;
2131
2132                 case FIN_WAIT_2:
2133                         clock_gettime(UTCP_CLOCK, &c->conn_timeout);
2134                         c->conn_timeout.tv_sec += utcp->timeout;
2135                         set_state(c, TIME_WAIT);
2136                         break;
2137
2138                 case CLOSE_WAIT:
2139                 case CLOSING:
2140                 case LAST_ACK:
2141                 case TIME_WAIT:
2142                         // Ehm, no. We should never receive a second FIN.
2143                         goto reset;
2144
2145                 default:
2146 #ifdef UTCP_DEBUG
2147                         abort();
2148 #endif
2149                         break;
2150                 }
2151
2152                 // FIN counts as one sequence number
2153                 c->rcv.nxt++;
2154                 len++;
2155
2156                 // Inform the application that the peer closed its end of the connection.
2157                 if(c->recv) {
2158                         errno = 0;
2159                         c->recv(c, NULL, 0);
2160                 }
2161         }
2162
2163         // Now we send something back if:
2164         // - we received data, so we have to send back an ACK
2165         //   -> sendatleastone = true
2166         // - or we got an ack, so we should maybe send a bit more data
2167         //   -> sendatleastone = false
2168
2169         if(is_reliable(c) || hdr.ctl & SYN || hdr.ctl & FIN) {
2170                 ack(c, has_data);
2171         }
2172
2173         return 0;
2174
2175 reset:
2176         swap_ports(&hdr);
2177         hdr.wnd = 0;
2178         hdr.aux = 0;
2179
2180         if(hdr.ctl & ACK) {
2181                 hdr.seq = hdr.ack;
2182                 hdr.ctl = RST;
2183         } else {
2184                 hdr.ack = hdr.seq + len;
2185                 hdr.seq = 0;
2186                 hdr.ctl = RST | ACK;
2187         }
2188
2189         print_packet(c, "send", &hdr, sizeof(hdr));
2190         utcp->send(utcp, &hdr, sizeof(hdr));
2191         return 0;
2192
2193 }
2194
2195 int utcp_shutdown(struct utcp_connection *c, int dir) {
2196         debug(c, "shutdown %d at %u\n", dir, c ? c->snd.last : 0);
2197
2198         if(!c) {
2199                 errno = EFAULT;
2200                 return -1;
2201         }
2202
2203         if(c->reapable) {
2204                 debug(c, "shutdown() called on closed connection\n");
2205                 errno = EBADF;
2206                 return -1;
2207         }
2208
2209         if(!(dir == UTCP_SHUT_RD || dir == UTCP_SHUT_WR || dir == UTCP_SHUT_RDWR)) {
2210                 errno = EINVAL;
2211                 return -1;
2212         }
2213
2214         // TCP does not have a provision for stopping incoming packets.
2215         // The best we can do is to just ignore them.
2216         if(dir == UTCP_SHUT_RD || dir == UTCP_SHUT_RDWR) {
2217                 c->recv = NULL;
2218         }
2219
2220         // The rest of the code deals with shutting down writes.
2221         if(dir == UTCP_SHUT_RD) {
2222                 return 0;
2223         }
2224
2225         // Only process shutting down writes once.
2226         if(c->shut_wr) {
2227                 return 0;
2228         }
2229
2230         c->shut_wr = true;
2231
2232         switch(c->state) {
2233         case CLOSED:
2234         case LISTEN:
2235                 errno = ENOTCONN;
2236                 return -1;
2237
2238         case SYN_SENT:
2239                 return 0;
2240
2241         case SYN_RECEIVED:
2242         case ESTABLISHED:
2243                 if(!is_reliable(c) && is_framed(c)) {
2244                         flush_unreliable_framed(c);
2245                 }
2246
2247                 set_state(c, FIN_WAIT_1);
2248                 break;
2249
2250         case FIN_WAIT_1:
2251         case FIN_WAIT_2:
2252                 return 0;
2253
2254         case CLOSE_WAIT:
2255                 set_state(c, CLOSING);
2256                 break;
2257
2258         case CLOSING:
2259         case LAST_ACK:
2260         case TIME_WAIT:
2261                 return 0;
2262         }
2263
2264         c->snd.last++;
2265
2266         ack(c, !is_reliable(c));
2267
2268         if(!timespec_isset(&c->rtrx_timeout)) {
2269                 start_retransmit_timer(c);
2270         }
2271
2272         return 0;
2273 }
2274
2275 static bool reset_connection(struct utcp_connection *c) {
2276         if(!c) {
2277                 errno = EFAULT;
2278                 return false;
2279         }
2280
2281         if(c->reapable) {
2282                 debug(c, "abort() called on closed connection\n");
2283                 errno = EBADF;
2284                 return false;
2285         }
2286
2287         c->recv = NULL;
2288         c->poll = NULL;
2289
2290         switch(c->state) {
2291         case CLOSED:
2292                 return true;
2293
2294         case LISTEN:
2295         case SYN_SENT:
2296         case CLOSING:
2297         case LAST_ACK:
2298         case TIME_WAIT:
2299                 set_state(c, CLOSED);
2300                 return true;
2301
2302         case SYN_RECEIVED:
2303         case ESTABLISHED:
2304         case FIN_WAIT_1:
2305         case FIN_WAIT_2:
2306         case CLOSE_WAIT:
2307                 set_state(c, CLOSED);
2308                 break;
2309         }
2310
2311         // Send RST
2312
2313         struct hdr hdr;
2314
2315         hdr.src = c->src;
2316         hdr.dst = c->dst;
2317         hdr.seq = c->snd.nxt;
2318         hdr.ack = 0;
2319         hdr.wnd = 0;
2320         hdr.ctl = RST;
2321
2322         print_packet(c, "send", &hdr, sizeof(hdr));
2323         c->utcp->send(c->utcp, &hdr, sizeof(hdr));
2324         return true;
2325 }
2326
2327 // Closes all the opened connections
2328 void utcp_abort_all_connections(struct utcp *utcp) {
2329         if(!utcp) {
2330                 errno = EINVAL;
2331                 return;
2332         }
2333
2334         for(int i = 0; i < utcp->nconnections; i++) {
2335                 struct utcp_connection *c = utcp->connections[i];
2336
2337                 if(c->reapable || c->state == CLOSED) {
2338                         continue;
2339                 }
2340
2341                 utcp_recv_t old_recv = c->recv;
2342                 utcp_poll_t old_poll = c->poll;
2343
2344                 reset_connection(c);
2345
2346                 if(old_recv) {
2347                         errno = 0;
2348                         old_recv(c, NULL, 0);
2349                 }
2350
2351                 if(old_poll && !c->reapable) {
2352                         errno = 0;
2353                         old_poll(c, 0);
2354                 }
2355         }
2356
2357         return;
2358 }
2359
2360 int utcp_close(struct utcp_connection *c) {
2361         debug(c, "closing\n");
2362
2363         if(c->rcvbuf.used) {
2364                 debug(c, "receive buffer not empty, resetting\n");
2365                 return reset_connection(c) ? 0 : -1;
2366         }
2367
2368         if(utcp_shutdown(c, SHUT_RDWR) && errno != ENOTCONN) {
2369                 return -1;
2370         }
2371
2372         c->recv = NULL;
2373         c->poll = NULL;
2374         c->reapable = true;
2375         return 0;
2376 }
2377
2378 int utcp_abort(struct utcp_connection *c) {
2379         if(!reset_connection(c)) {
2380                 return -1;
2381         }
2382
2383         c->reapable = true;
2384         return 0;
2385 }
2386
2387 /* Handle timeouts.
2388  * One call to this function will loop through all connections,
2389  * checking if something needs to be resent or not.
2390  * The return value is the time to the next timeout in milliseconds,
2391  * or maybe a negative value if the timeout is infinite.
2392  */
2393 struct timespec utcp_timeout(struct utcp *utcp) {
2394         struct timespec now;
2395         clock_gettime(UTCP_CLOCK, &now);
2396         struct timespec next = {now.tv_sec + 3600, now.tv_nsec};
2397
2398         for(int i = 0; i < utcp->nconnections; i++) {
2399                 struct utcp_connection *c = utcp->connections[i];
2400
2401                 if(!c) {
2402                         continue;
2403                 }
2404
2405                 // delete connections that have been utcp_close()d.
2406                 if(c->state == CLOSED) {
2407                         if(c->reapable) {
2408                                 debug(c, "reaping\n");
2409                                 free_connection(c);
2410                                 i--;
2411                         }
2412
2413                         continue;
2414                 }
2415
2416                 if(timespec_isset(&c->conn_timeout) && timespec_lt(&c->conn_timeout, &now)) {
2417                         errno = ETIMEDOUT;
2418                         c->state = CLOSED;
2419
2420                         if(c->recv) {
2421                                 c->recv(c, NULL, 0);
2422                         }
2423
2424                         if(c->poll && !c->reapable) {
2425                                 c->poll(c, 0);
2426                         }
2427
2428                         continue;
2429                 }
2430
2431                 if(timespec_isset(&c->rtrx_timeout) && timespec_lt(&c->rtrx_timeout, &now)) {
2432                         debug(c, "retransmitting after timeout\n");
2433                         retransmit(c);
2434                 }
2435
2436                 if(c->poll) {
2437                         if((c->state == ESTABLISHED || c->state == CLOSE_WAIT) && c->do_poll) {
2438                                 c->do_poll = false;
2439                                 uint32_t len = is_framed(c) ? min(buffer_free(&c->sndbuf), MAX_UNRELIABLE_SIZE) : buffer_free(&c->sndbuf);
2440
2441                                 if(len) {
2442                                         c->poll(c, len);
2443                                 }
2444                         } else if(c->state == CLOSED) {
2445                                 c->poll(c, 0);
2446                         }
2447                 }
2448
2449                 if(timespec_isset(&c->conn_timeout) && timespec_lt(&c->conn_timeout, &next)) {
2450                         next = c->conn_timeout;
2451                 }
2452
2453                 if(timespec_isset(&c->rtrx_timeout) && timespec_lt(&c->rtrx_timeout, &next)) {
2454                         next = c->rtrx_timeout;
2455                 }
2456         }
2457
2458         struct timespec diff;
2459
2460         timespec_sub(&next, &now, &diff);
2461
2462         return diff;
2463 }
2464
2465 bool utcp_is_active(struct utcp *utcp) {
2466         if(!utcp) {
2467                 return false;
2468         }
2469
2470         for(int i = 0; i < utcp->nconnections; i++)
2471                 if(utcp->connections[i]->state != CLOSED && utcp->connections[i]->state != TIME_WAIT) {
2472                         return true;
2473                 }
2474
2475         return false;
2476 }
2477
2478 struct utcp *utcp_init(utcp_accept_t accept, utcp_pre_accept_t pre_accept, utcp_send_t send, void *priv) {
2479         if(!send) {
2480                 errno = EFAULT;
2481                 return NULL;
2482         }
2483
2484         struct utcp *utcp = calloc(1, sizeof(*utcp));
2485
2486         if(!utcp) {
2487                 return NULL;
2488         }
2489
2490         utcp_set_mtu(utcp, DEFAULT_MTU);
2491
2492         if(!utcp->pkt) {
2493                 free(utcp);
2494                 return NULL;
2495         }
2496
2497         if(!CLOCK_GRANULARITY) {
2498                 struct timespec res;
2499                 clock_getres(UTCP_CLOCK, &res);
2500                 CLOCK_GRANULARITY = res.tv_sec * USEC_PER_SEC + res.tv_nsec / 1000;
2501         }
2502
2503         utcp->accept = accept;
2504         utcp->pre_accept = pre_accept;
2505         utcp->send = send;
2506         utcp->priv = priv;
2507         utcp->timeout = DEFAULT_USER_TIMEOUT; // sec
2508
2509         return utcp;
2510 }
2511
2512 void utcp_exit(struct utcp *utcp) {
2513         if(!utcp) {
2514                 return;
2515         }
2516
2517         for(int i = 0; i < utcp->nconnections; i++) {
2518                 struct utcp_connection *c = utcp->connections[i];
2519
2520                 if(!c->reapable) {
2521                         if(c->recv) {
2522                                 c->recv(c, NULL, 0);
2523                         }
2524
2525                         if(c->poll && !c->reapable) {
2526                                 c->poll(c, 0);
2527                         }
2528                 }
2529
2530                 buffer_exit(&c->rcvbuf);
2531                 buffer_exit(&c->sndbuf);
2532                 free(c);
2533         }
2534
2535         free(utcp->connections);
2536         free(utcp->pkt);
2537         free(utcp);
2538 }
2539
2540 uint16_t utcp_get_mtu(struct utcp *utcp) {
2541         return utcp ? utcp->mtu : 0;
2542 }
2543
2544 uint16_t utcp_get_mss(struct utcp *utcp) {
2545         return utcp ? utcp->mss : 0;
2546 }
2547
2548 void utcp_set_mtu(struct utcp *utcp, uint16_t mtu) {
2549         if(!utcp) {
2550                 return;
2551         }
2552
2553         if(mtu <= sizeof(struct hdr)) {
2554                 return;
2555         }
2556
2557         if(mtu > utcp->mtu) {
2558                 char *new = realloc(utcp->pkt, mtu + sizeof(struct hdr));
2559
2560                 if(!new) {
2561                         return;
2562                 }
2563
2564                 utcp->pkt = new;
2565         }
2566
2567         utcp->mtu = mtu;
2568         utcp->mss = mtu - sizeof(struct hdr);
2569 }
2570
2571 void utcp_reset_timers(struct utcp *utcp) {
2572         if(!utcp) {
2573                 return;
2574         }
2575
2576         struct timespec now, then;
2577
2578         clock_gettime(UTCP_CLOCK, &now);
2579
2580         then = now;
2581
2582         then.tv_sec += utcp->timeout;
2583
2584         for(int i = 0; i < utcp->nconnections; i++) {
2585                 struct utcp_connection *c = utcp->connections[i];
2586
2587                 if(c->reapable) {
2588                         continue;
2589                 }
2590
2591                 if(timespec_isset(&c->rtrx_timeout)) {
2592                         c->rtrx_timeout = now;
2593                 }
2594
2595                 if(timespec_isset(&c->conn_timeout)) {
2596                         c->conn_timeout = then;
2597                 }
2598
2599                 c->rtt_start.tv_sec = 0;
2600
2601                 if(c->rto > START_RTO) {
2602                         c->rto = START_RTO;
2603                 }
2604         }
2605 }
2606
2607 int utcp_get_user_timeout(struct utcp *u) {
2608         return u ? u->timeout : 0;
2609 }
2610
2611 void utcp_set_user_timeout(struct utcp *u, int timeout) {
2612         if(u) {
2613                 u->timeout = timeout;
2614         }
2615 }
2616
2617 size_t utcp_get_sndbuf(struct utcp_connection *c) {
2618         return c ? c->sndbuf.maxsize : 0;
2619 }
2620
2621 size_t utcp_get_sndbuf_free(struct utcp_connection *c) {
2622         if(!c) {
2623                 return 0;
2624         }
2625
2626         switch(c->state) {
2627         case SYN_SENT:
2628         case SYN_RECEIVED:
2629         case ESTABLISHED:
2630         case CLOSE_WAIT:
2631                 return buffer_free(&c->sndbuf);
2632
2633         default:
2634                 return 0;
2635         }
2636 }
2637
2638 void utcp_set_sndbuf(struct utcp_connection *c, size_t size) {
2639         if(!c) {
2640                 return;
2641         }
2642
2643         c->sndbuf.maxsize = size;
2644
2645         if(c->sndbuf.maxsize != size) {
2646                 c->sndbuf.maxsize = -1;
2647         }
2648
2649         c->do_poll = is_reliable(c) && buffer_free(&c->sndbuf);
2650 }
2651
2652 size_t utcp_get_rcvbuf(struct utcp_connection *c) {
2653         return c ? c->rcvbuf.maxsize : 0;
2654 }
2655
2656 size_t utcp_get_rcvbuf_free(struct utcp_connection *c) {
2657         if(c && (c->state == ESTABLISHED || c->state == CLOSE_WAIT)) {
2658                 return buffer_free(&c->rcvbuf);
2659         } else {
2660                 return 0;
2661         }
2662 }
2663
2664 void utcp_set_rcvbuf(struct utcp_connection *c, size_t size) {
2665         if(!c) {
2666                 return;
2667         }
2668
2669         c->rcvbuf.maxsize = size;
2670
2671         if(c->rcvbuf.maxsize != size) {
2672                 c->rcvbuf.maxsize = -1;
2673         }
2674 }
2675
2676 size_t utcp_get_sendq(struct utcp_connection *c) {
2677         return c->sndbuf.used;
2678 }
2679
2680 size_t utcp_get_recvq(struct utcp_connection *c) {
2681         return c->rcvbuf.used;
2682 }
2683
2684 bool utcp_get_nodelay(struct utcp_connection *c) {
2685         return c ? c->nodelay : false;
2686 }
2687
2688 void utcp_set_nodelay(struct utcp_connection *c, bool nodelay) {
2689         if(c) {
2690                 c->nodelay = nodelay;
2691         }
2692 }
2693
2694 bool utcp_get_keepalive(struct utcp_connection *c) {
2695         return c ? c->keepalive : false;
2696 }
2697
2698 void utcp_set_keepalive(struct utcp_connection *c, bool keepalive) {
2699         if(c) {
2700                 c->keepalive = keepalive;
2701         }
2702 }
2703
2704 size_t utcp_get_outq(struct utcp_connection *c) {
2705         return c ? seqdiff(c->snd.nxt, c->snd.una) : 0;
2706 }
2707
2708 void utcp_set_recv_cb(struct utcp_connection *c, utcp_recv_t recv) {
2709         if(c) {
2710                 c->recv = recv;
2711         }
2712 }
2713
2714 void utcp_set_poll_cb(struct utcp_connection *c, utcp_poll_t poll) {
2715         if(c) {
2716                 c->poll = poll;
2717                 c->do_poll = is_reliable(c) && buffer_free(&c->sndbuf);
2718         }
2719 }
2720
2721 void utcp_set_accept_cb(struct utcp *utcp, utcp_accept_t accept, utcp_pre_accept_t pre_accept) {
2722         if(utcp) {
2723                 utcp->accept = accept;
2724                 utcp->pre_accept = pre_accept;
2725         }
2726 }
2727
2728 void utcp_expect_data(struct utcp_connection *c, bool expect) {
2729         if(!c || c->reapable) {
2730                 return;
2731         }
2732
2733         if(!(c->state == ESTABLISHED || c->state == FIN_WAIT_1 || c->state == FIN_WAIT_2)) {
2734                 return;
2735         }
2736
2737         if(expect) {
2738                 // If we expect data, start the connection timer.
2739                 if(!timespec_isset(&c->conn_timeout)) {
2740                         clock_gettime(UTCP_CLOCK, &c->conn_timeout);
2741                         c->conn_timeout.tv_sec += c->utcp->timeout;
2742                 }
2743         } else {
2744                 // If we want to cancel expecting data, only clear the timer when there is no unACKed data.
2745                 if(c->snd.una == c->snd.last) {
2746                         timespec_clear(&c->conn_timeout);
2747                 }
2748         }
2749 }
2750
2751 void utcp_offline(struct utcp *utcp, bool offline) {
2752         struct timespec now;
2753         clock_gettime(UTCP_CLOCK, &now);
2754
2755         for(int i = 0; i < utcp->nconnections; i++) {
2756                 struct utcp_connection *c = utcp->connections[i];
2757
2758                 if(c->reapable) {
2759                         continue;
2760                 }
2761
2762                 utcp_expect_data(c, offline);
2763
2764                 if(!offline) {
2765                         if(timespec_isset(&c->rtrx_timeout)) {
2766                                 c->rtrx_timeout = now;
2767                         }
2768
2769                         utcp->connections[i]->rtt_start.tv_sec = 0;
2770
2771                         if(c->rto > START_RTO) {
2772                                 c->rto = START_RTO;
2773                         }
2774                 }
2775         }
2776 }
2777
2778 void utcp_set_retransmit_cb(struct utcp *utcp, utcp_retransmit_t cb) {
2779         utcp->retransmit = cb;
2780 }
2781
2782 void utcp_set_clock_granularity(long granularity) {
2783         CLOCK_GRANULARITY = granularity;
2784 }
2785
2786 int utcp_get_flush_timeout(struct utcp *utcp) {
2787         return utcp->flush_timeout;
2788 }
2789
2790 void utcp_set_flush_timeout(struct utcp *utcp, int milliseconds) {
2791         utcp->flush_timeout = milliseconds;
2792 }