]> git.meshlink.io Git - utcp/blobdiff - utcp.c
Implement ringbuffers.
[utcp] / utcp.c
diff --git a/utcp.c b/utcp.c
index 7e28f9e27e3aef2a7f0937597eabbd5bd3c36f7a..f94c6e7e1be55ddef85b20efaff91ba143d9a620 100644 (file)
--- a/utcp.c
+++ b/utcp.c
@@ -153,12 +153,40 @@ static int32_t seqdiff(uint32_t a, uint32_t b) {
 }
 
 // Buffer functions
-// TODO: convert to ringbuffers to avoid memmove() operations.
+static bool buffer_wraps(struct buffer *buf) {
+       return buf->size - buf->offset < buf->used;
+}
+
+static bool buffer_resize(struct buffer *buf, uint32_t newsize) {
+       char *newdata = realloc(buf->data, newsize);
+
+       if(!newdata) {
+               return false;
+       }
+
+       buf->data = newdata;
+
+       if(buffer_wraps(buf)) {
+               // Shift the right part of the buffer until it hits the end of the new buffer.
+               // Old situation:
+               // [345......012]
+               // New situation:
+               // [345.........|........012]
+               uint32_t tailsize = buf->size - buf->offset;
+               uint32_t newoffset = newsize - tailsize;
+               memmove(buf + newoffset, buf + buf->offset, tailsize);
+               buf->offset = newoffset;
+       }
+
+       buf->size = newsize;
+       return true;
+}
 
 // Store data into the buffer
 static ssize_t buffer_put_at(struct buffer *buf, size_t offset, const void *data, size_t len) {
        debug("buffer_put_at %lu %lu %lu\n", (unsigned long)buf->used, (unsigned long)offset, (unsigned long)len);
 
+       // Ensure we don't store more than maxsize bytes in total
        size_t required = offset + len;
 
        if(required > buf->maxsize) {
@@ -170,32 +198,41 @@ static ssize_t buffer_put_at(struct buffer *buf, size_t offset, const void *data
                required = buf->maxsize;
        }
 
+       // Check if we need to resize the buffer
        if(required > buf->size) {
                size_t newsize = buf->size;
 
                if(!newsize) {
-                       newsize = required;
-               } else {
-                       do {
-                               newsize *= 2;
-                       } while(newsize < required);
+                       newsize = 4096;
                }
 
+               do {
+                       newsize *= 2;
+               } while(newsize < required);
+
                if(newsize > buf->maxsize) {
                        newsize = buf->maxsize;
                }
 
-               char *newdata = realloc(buf->data, newsize);
-
-               if(!newdata) {
+               if(!buffer_resize(buf, newsize)) {
                        return -1;
                }
+       }
 
-               buf->data = newdata;
-               buf->size = newsize;
+       uint32_t realoffset = buf->offset + offset;
+
+       if(buf->size - buf->offset < offset) {
+               // The offset wrapped
+               realoffset -= buf->size;
        }
 
-       memcpy(buf->data + offset, data, len);
+       if(buf->size - realoffset < len) {
+               // The new chunk of data must be wrapped
+               memcpy(buf->data + realoffset, data, buf->size - realoffset);
+               memcpy(buf->data, (char *)data + buf->size - realoffset, len - (buf->size - realoffset));
+       } else {
+               memcpy(buf->data + realoffset, data, len);
+       }
 
        if(required > buf->used) {
                buf->used = required;
@@ -208,52 +245,72 @@ static ssize_t buffer_put(struct buffer *buf, const void *data, size_t len) {
        return buffer_put_at(buf, buf->used, data, len);
 }
 
-// Get data from the buffer. data can be NULL.
-static ssize_t buffer_get(struct buffer *buf, void *data, size_t len) {
-       if(len > buf->used) {
-               len = buf->used;
+// Copy data from the buffer without removing it.
+static ssize_t buffer_copy(struct buffer *buf, void *data, size_t offset, size_t len) {
+       // Ensure we don't copy more than is actually stored in the buffer
+       if(offset >= buf->used) {
+               return 0;
        }
 
-       if(data) {
-               memcpy(data, buf->data, len);
+       if(buf->used - offset < len) {
+               len = buf->used - offset;
        }
 
-       if(len < buf->used) {
-               memmove(buf->data, buf->data + len, buf->used - len);
+       uint32_t realoffset = buf->offset + offset;
+
+       if(buf->size - buf->offset < offset) {
+               // The offset wrapped
+               realoffset -= buf->size;
+       }
+
+       if(buf->size - realoffset < len) {
+               // The data is wrapped
+               memcpy(data, buf->data + realoffset, buf->size - realoffset);
+               memcpy((char *)data + buf->size - realoffset, buf->data, len - (buf->size - realoffset));
+       } else {
+               memcpy(data, buf->data + realoffset, len);
        }
 
-       buf->used -= len;
        return len;
 }
 
-// Copy data from the buffer without removing it.
-static ssize_t buffer_copy(struct buffer *buf, void *data, size_t offset, size_t len) {
-       if(offset >= buf->used) {
-               return 0;
-       }
+// Get data from the buffer.
+static ssize_t buffer_get(struct buffer *buf, void *data, size_t len) {
+       len = buffer_copy(buf, data, 0, len);
 
-       if(offset + len > buf->used) {
-               len = buf->used - offset;
+       if(buf->size - buf->offset < len) {
+               buf->offset -= buf->size;
        }
 
-       memcpy(data, buf->data + offset, len);
+       buf->offset += len;
+       buf->used -= len;
        return len;
 }
 
-static bool buffer_init(struct buffer *buf, uint32_t len, uint32_t maxlen) {
-       memset(buf, 0, sizeof(*buf));
+// Discard data from the buffer.
+static ssize_t buffer_discard(struct buffer *buf, size_t len) {
+       if(buf->used < len) {
+               len = buf->used;
+       }
 
-       if(len) {
-               buf->data = malloc(len);
+       if(buf->size - buf->offset < len) {
+               buf->offset -= buf->size;
+       }
 
-               if(!buf->data) {
-                       return false;
-               }
+       buf->offset += len;
+       buf->used -= len;
+
+       return len;
+}
+
+static bool buffer_set_size(struct buffer *buf, uint32_t minsize, uint32_t maxsize) {
+       if(maxsize < minsize) {
+               maxsize = minsize;
        }
 
-       buf->size = len;
-       buf->maxsize = maxlen;
-       return true;
+       buf->maxsize = maxsize;
+
+       return buf->size >= minsize || buffer_resize(buf, minsize);
 }
 
 static void buffer_exit(struct buffer *buf) {
@@ -360,12 +417,12 @@ static struct utcp_connection *allocate_connection(struct utcp *utcp, uint16_t s
                return NULL;
        }
 
-       if(!buffer_init(&c->sndbuf, DEFAULT_SNDBUFSIZE, DEFAULT_MAXSNDBUFSIZE)) {
+       if(!buffer_set_size(&c->sndbuf, DEFAULT_SNDBUFSIZE, DEFAULT_MAXSNDBUFSIZE)) {
                free(c);
                return NULL;
        }
 
-       if(!buffer_init(&c->rcvbuf, DEFAULT_RCVBUFSIZE, DEFAULT_MAXRCVBUFSIZE)) {
+       if(!buffer_set_size(&c->rcvbuf, DEFAULT_RCVBUFSIZE, DEFAULT_MAXRCVBUFSIZE)) {
                buffer_exit(&c->sndbuf);
                free(c);
                return NULL;
@@ -415,13 +472,13 @@ static void update_rtt(struct utcp_connection *c, uint32_t rtt) {
        if(!utcp->srtt) {
                utcp->srtt = rtt;
                utcp->rttvar = rtt / 2;
-               utcp->rto = rtt + max(2 * rtt, CLOCK_GRANULARITY);
        } else {
                utcp->rttvar = (utcp->rttvar * 3 + absdiff(utcp->srtt, rtt)) / 4;
                utcp->srtt = (utcp->srtt * 7 + rtt) / 8;
-               utcp->rto = utcp->srtt + max(utcp->rttvar, CLOCK_GRANULARITY);
        }
 
+       utcp->rto = utcp->srtt + max(4 * utcp->rttvar, CLOCK_GRANULARITY);
+
        if(utcp->rto > MAX_RTO) {
                utcp->rto = MAX_RTO;
        }
@@ -527,13 +584,7 @@ static void ack(struct utcp_connection *c, bool sendatleastone) {
        struct {
                struct hdr hdr;
                uint8_t data[];
-       } *pkt;
-
-       pkt = malloc(sizeof(pkt->hdr) + c->utcp->mtu);
-
-       if(!pkt) {
-               return;
-       }
+       } *pkt = c->utcp->pkt;
 
        pkt->hdr.src = c->src;
        pkt->hdr.dst = c->dst;
@@ -566,8 +617,6 @@ static void ack(struct utcp_connection *c, bool sendatleastone) {
                print_packet(c->utcp, "send", pkt, sizeof(pkt->hdr) + seglen);
                c->utcp->send(c->utcp, pkt, sizeof(pkt->hdr) + seglen);
        } while(left);
-
-       free(pkt);
 }
 
 ssize_t utcp_send(struct utcp_connection *c, const void *data, size_t len) {
@@ -629,6 +678,8 @@ ssize_t utcp_send(struct utcp_connection *c, const void *data, size_t len) {
 
        if(is_reliable(c) || (c->state != SYN_SENT && c->state != SYN_RECEIVED)) {
                len = buffer_put(&c->sndbuf, data, len);
+       } else {
+               return 0;
        }
 
        if(len <= 0) {
@@ -685,13 +736,7 @@ static void retransmit(struct utcp_connection *c) {
        struct {
                struct hdr hdr;
                uint8_t data[];
-       } *pkt;
-
-       pkt = malloc(sizeof(pkt->hdr) + c->utcp->mtu);
-
-       if(!pkt) {
-               return;
-       }
+       } *pkt = c->utcp->pkt;
 
        pkt->hdr.src = c->src;
        pkt->hdr.dst = c->dst;
@@ -771,7 +816,7 @@ static void retransmit(struct utcp_connection *c) {
        c->rtt_start.tv_sec = 0; // invalidate RTT timer
 
 cleanup:
-       free(pkt);
+       return;
 }
 
 /* Update receive buffer and SACK entries after consuming data.
@@ -1011,6 +1056,8 @@ ssize_t utcp_recv(struct utcp *utcp, const void *data, size_t len) {
                ptr += 2;
        }
 
+       bool has_data = len || (hdr.ctl & (SYN | FIN));
+
        // Try to match the packet to an existing connection
 
        struct utcp_connection *c = find_connection(utcp, hdr.dst, hdr.src);
@@ -1053,6 +1100,7 @@ ssize_t utcp_recv(struct utcp *utcp, const void *data, size_t len) {
                                c->flags = UTCP_TCP;
                        }
 
+synack:
                        // Return SYN+ACK, go to SYN_RECEIVED state
                        c->snd.wnd = hdr.wnd;
                        c->rcv.irs = hdr.seq;
@@ -1105,8 +1153,6 @@ ssize_t utcp_recv(struct utcp *utcp, const void *data, size_t len) {
 
        // It is for an existing connection.
 
-       uint32_t prevrcvnxt = c->rcv.nxt;
-
        // 1. Drop invalid packets.
 
        // 1a. Drop packets that should not happen in our current state.
@@ -1130,7 +1176,7 @@ ssize_t utcp_recv(struct utcp *utcp, const void *data, size_t len) {
                break;
        }
 
-       // 1b. Drop packets with a sequence number not in our receive window.
+       // 1b. Discard data that is not in our receive window.
 
        if(is_reliable(c)) {
                bool acceptable;
@@ -1282,7 +1328,6 @@ ssize_t utcp_recv(struct utcp *utcp, const void *data, size_t len) {
        // 3. Advance snd.una
 
        advanced = seqdiff(hdr.ack, c->snd.una);
-       prevrcvnxt = c->rcv.nxt;
 
        if(advanced) {
                // RTT measurement
@@ -1314,11 +1359,13 @@ ssize_t utcp_recv(struct utcp *utcp, const void *data, size_t len) {
 
                assert(data_acked >= 0);
 
+#ifndef NDEBUG
                int32_t bufused = seqdiff(c->snd.last, c->snd.una);
                assert(data_acked <= bufused);
+#endif
 
                if(data_acked) {
-                       buffer_get(&c->sndbuf, NULL, data_acked);
+                       buffer_discard(&c->sndbuf, data_acked);
                }
 
                // Also advance snd.nxt if possible
@@ -1347,7 +1394,7 @@ ssize_t utcp_recv(struct utcp *utcp, const void *data, size_t len) {
                case CLOSING:
                        if(c->snd.una == c->snd.last) {
                                gettimeofday(&c->conn_timeout, NULL);
-                               c->conn_timeout.tv_sec += 60;
+                               c->conn_timeout.tv_sec += utcp->timeout;
                                set_state(c, TIME_WAIT);
                        }
 
@@ -1411,6 +1458,9 @@ skip_ack:
                        break;
 
                case SYN_RECEIVED:
+                       // This is a retransmit of a SYN, send back the SYNACK.
+                       goto synack;
+
                case ESTABLISHED:
                case FIN_WAIT_1:
                case FIN_WAIT_2:
@@ -1506,7 +1556,7 @@ skip_ack:
 
                case FIN_WAIT_2:
                        gettimeofday(&c->conn_timeout, NULL);
-                       c->conn_timeout.tv_sec += 60;
+                       c->conn_timeout.tv_sec += utcp->timeout;
                        set_state(c, TIME_WAIT);
                        break;
 
@@ -1536,13 +1586,13 @@ skip_ack:
        }
 
        // Now we send something back if:
-       // - we advanced rcv.nxt (ie, we got some data that needs to be ACKed)
+       // - we received data, so we have to send back an ACK
        //   -> sendatleastone = true
        // - or we got an ack, so we should maybe send a bit more data
        //   -> sendatleastone = false
 
        if(is_reliable(c) || hdr.ctl & SYN || hdr.ctl & FIN) {
-               ack(c, len || prevrcvnxt != c->rcv.nxt);
+               ack(c, has_data);
        }
 
        return 0;
@@ -1854,7 +1904,7 @@ struct utcp *utcp_init(utcp_accept_t accept, utcp_pre_accept_t pre_accept, utcp_
        utcp->pre_accept = pre_accept;
        utcp->send = send;
        utcp->priv = priv;
-       utcp->mtu = DEFAULT_MTU;
+       utcp_set_mtu(utcp, DEFAULT_MTU);
        utcp->timeout = DEFAULT_USER_TIMEOUT; // sec
        utcp->rto = START_RTO; // usec
 
@@ -1893,10 +1943,25 @@ uint16_t utcp_get_mtu(struct utcp *utcp) {
 }
 
 void utcp_set_mtu(struct utcp *utcp, uint16_t mtu) {
-       // TODO: handle overhead of the header
-       if(utcp) {
-               utcp->mtu = mtu;
+       if(!utcp) {
+               return;
        }
+
+       if(mtu <= sizeof(struct hdr)) {
+               return;
+       }
+
+       if(mtu > utcp->mtu) {
+               char *new = realloc(utcp->pkt, mtu + sizeof(struct hdr));
+
+               if(!new) {
+                       return;
+               }
+
+               utcp->pkt = new;
+       }
+
+       utcp->mtu = mtu;
 }
 
 void utcp_reset_timers(struct utcp *utcp) {
@@ -1919,8 +1984,14 @@ void utcp_reset_timers(struct utcp *utcp) {
                        continue;
                }
 
-               c->rtrx_timeout = now;
-               c->conn_timeout = then;
+               if(timerisset(&c->rtrx_timeout)) {
+                       c->rtrx_timeout = now;
+               }
+
+               if(timerisset(&c->conn_timeout)) {
+                       c->conn_timeout = then;
+               }
+
                c->rtt_start.tv_sec = 0;
        }
 
@@ -2071,19 +2142,24 @@ void utcp_expect_data(struct utcp_connection *c, bool expect) {
 }
 
 void utcp_offline(struct utcp *utcp, bool offline) {
+       struct timeval now;
+       gettimeofday(&now, NULL);
+
        for(int i = 0; i < utcp->nconnections; i++) {
                struct utcp_connection *c = utcp->connections[i];
 
-               if(!c->reapable) {
-                       utcp_expect_data(c, offline);
-
-                       // If we are online again, reset the retransmission timers, but keep the connection timeout as it is,
-                       // to prevent peers toggling online/offline state frequently from keeping connections alive
-                       // if there is no progress in sending actual data.
-                       if(!offline) {
-                               gettimeofday(&utcp->connections[i]->rtrx_timeout, NULL);
-                               utcp->connections[i]->rtt_start.tv_sec = 0;
+               if(c->reapable) {
+                       continue;
+               }
+
+               utcp_expect_data(c, offline);
+
+               if(!offline) {
+                       if(timerisset(&c->rtrx_timeout)) {
+                               c->rtrx_timeout = now;
                        }
+
+                       utcp->connections[i]->rtt_start.tv_sec = 0;
                }
        }