}
// Buffer functions
-// TODO: convert to ringbuffers to avoid memmove() operations.
+static bool buffer_wraps(struct buffer *buf) {
+ return buf->size - buf->offset < buf->used;
+}
+
+static bool buffer_resize(struct buffer *buf, uint32_t newsize) {
+ char *newdata = realloc(buf->data, newsize);
+
+ if(!newdata) {
+ return false;
+ }
+
+ buf->data = newdata;
+
+ if(buffer_wraps(buf)) {
+ // Shift the right part of the buffer until it hits the end of the new buffer.
+ // Old situation:
+ // [345......012]
+ // New situation:
+ // [345.........|........012]
+ uint32_t tailsize = buf->size - buf->offset;
+ uint32_t newoffset = newsize - tailsize;
+ memmove(buf + newoffset, buf + buf->offset, tailsize);
+ buf->offset = newoffset;
+ }
+
+ buf->size = newsize;
+ return true;
+}
// Store data into the buffer
static ssize_t buffer_put_at(struct buffer *buf, size_t offset, const void *data, size_t len) {
debug(NULL, "buffer_put_at %lu %lu %lu\n", (unsigned long)buf->used, (unsigned long)offset, (unsigned long)len);
+ // Ensure we don't store more than maxsize bytes in total
size_t required = offset + len;
if(required > buf->maxsize) {
required = buf->maxsize;
}
+ // Check if we need to resize the buffer
if(required > buf->size) {
size_t newsize = buf->size;
if(!newsize) {
- newsize = required;
- } else {
- do {
- newsize *= 2;
- } while(newsize < required);
+ newsize = 4096;
}
+ do {
+ newsize *= 2;
+ } while(newsize < required);
+
if(newsize > buf->maxsize) {
newsize = buf->maxsize;
}
- char *newdata = realloc(buf->data, newsize);
-
- if(!newdata) {
+ if(!buffer_resize(buf, newsize)) {
return -1;
}
+ }
- buf->data = newdata;
- buf->size = newsize;
+ uint32_t realoffset = buf->offset + offset;
+
+ if(buf->size - buf->offset < offset) {
+ // The offset wrapped
+ realoffset -= buf->size;
}
- memcpy(buf->data + offset, data, len);
+ if(buf->size - realoffset < len) {
+ // The new chunk of data must be wrapped
+ memcpy(buf->data + realoffset, data, buf->size - realoffset);
+ memcpy(buf->data, (char *)data + buf->size - realoffset, len - (buf->size - realoffset));
+ } else {
+ memcpy(buf->data + realoffset, data, len);
+ }
if(required > buf->used) {
buf->used = required;
return buffer_put_at(buf, buf->used, data, len);
}
-// Get data from the buffer. data can be NULL.
-static ssize_t buffer_get(struct buffer *buf, void *data, size_t len) {
- if(len > buf->used) {
- len = buf->used;
+// Copy data from the buffer without removing it.
+static ssize_t buffer_copy(struct buffer *buf, void *data, size_t offset, size_t len) {
+ // Ensure we don't copy more than is actually stored in the buffer
+ if(offset >= buf->used) {
+ return 0;
}
- if(data) {
- memcpy(data, buf->data, len);
+ if(buf->used - offset < len) {
+ len = buf->used - offset;
}
- if(len < buf->used) {
- memmove(buf->data, buf->data + len, buf->used - len);
+ uint32_t realoffset = buf->offset + offset;
+
+ if(buf->size - buf->offset < offset) {
+ // The offset wrapped
+ realoffset -= buf->size;
+ }
+
+ if(buf->size - realoffset < len) {
+ // The data is wrapped
+ memcpy(data, buf->data + realoffset, buf->size - realoffset);
+ memcpy((char *)data + buf->size - realoffset, buf->data, len - (buf->size - realoffset));
+ } else {
+ memcpy(data, buf->data + realoffset, len);
}
- buf->used -= len;
return len;
}
-// Copy data from the buffer without removing it.
-static ssize_t buffer_copy(struct buffer *buf, void *data, size_t offset, size_t len) {
- if(offset >= buf->used) {
- return 0;
+// Discard data from the buffer.
+static ssize_t buffer_discard(struct buffer *buf, size_t len) {
+ if(buf->used < len) {
+ len = buf->used;
}
- if(offset + len > buf->used) {
- len = buf->used - offset;
+ if(buf->size - buf->offset < len) {
+ buf->offset -= buf->size;
}
- memcpy(data, buf->data + offset, len);
+ buf->offset += len;
+ buf->used -= len;
+
return len;
}
-static bool buffer_init(struct buffer *buf, uint32_t len, uint32_t maxlen) {
- memset(buf, 0, sizeof(*buf));
-
- if(len) {
- buf->data = malloc(len);
-
- if(!buf->data) {
- return false;
- }
+static bool buffer_set_size(struct buffer *buf, uint32_t minsize, uint32_t maxsize) {
+ if(maxsize < minsize) {
+ maxsize = minsize;
}
- buf->size = len;
- buf->maxsize = maxlen;
- return true;
+ buf->maxsize = maxsize;
+
+ return buf->size >= minsize || buffer_resize(buf, minsize);
}
static void buffer_exit(struct buffer *buf) {
return NULL;
}
- if(!buffer_init(&c->sndbuf, DEFAULT_SNDBUFSIZE, DEFAULT_MAXSNDBUFSIZE)) {
+ if(!buffer_set_size(&c->sndbuf, DEFAULT_SNDBUFSIZE, DEFAULT_MAXSNDBUFSIZE)) {
free(c);
return NULL;
}
- if(!buffer_init(&c->rcvbuf, DEFAULT_RCVBUFSIZE, DEFAULT_MAXRCVBUFSIZE)) {
+ if(!buffer_set_size(&c->rcvbuf, DEFAULT_RCVBUFSIZE, DEFAULT_MAXRCVBUFSIZE)) {
buffer_exit(&c->sndbuf);
free(c);
return NULL;
c->snd.una = c->snd.iss;
c->snd.nxt = c->snd.iss + 1;
c->snd.last = c->snd.nxt;
- c->snd.cwnd = (utcp->mtu > 2190 ? 2 : utcp->mtu > 1095 ? 3 : 4) * utcp->mtu;
+ c->snd.cwnd = (utcp->mss > 2190 ? 2 : utcp->mss > 1095 ? 3 : 4) * utcp->mss;
c->snd.ssthresh = ~0;
debug_cwnd(c);
c->utcp = utcp;
} else if(cwndleft < left) {
left = cwndleft;
- if(!sendatleastone || cwndleft > c->utcp->mtu) {
- left -= left % c->utcp->mtu;
+ if(!sendatleastone || cwndleft > c->utcp->mss) {
+ left -= left % c->utcp->mss;
}
}
struct {
struct hdr hdr;
uint8_t data[];
- } *pkt;
-
- pkt = malloc(sizeof(pkt->hdr) + c->utcp->mtu);
-
- if(!pkt) {
- return;
- }
+ } *pkt = c->utcp->pkt;
pkt->hdr.src = c->src;
pkt->hdr.dst = c->dst;
pkt->hdr.aux = 0;
do {
- uint32_t seglen = left > c->utcp->mtu ? c->utcp->mtu : left;
+ uint32_t seglen = left > c->utcp->mss ? c->utcp->mss : left;
pkt->hdr.seq = c->snd.nxt;
buffer_copy(&c->sndbuf, pkt->data, seqdiff(c->snd.nxt, c->snd.una), seglen);
print_packet(c, "send", pkt, sizeof(pkt->hdr) + seglen);
c->utcp->send(c->utcp, pkt, sizeof(pkt->hdr) + seglen);
} while(left);
-
- free(pkt);
}
ssize_t utcp_send(struct utcp_connection *c, const void *data, size_t len) {
if(!is_reliable(c)) {
c->snd.una = c->snd.nxt = c->snd.last;
- buffer_get(&c->sndbuf, NULL, c->sndbuf.used);
+ buffer_discard(&c->sndbuf, c->sndbuf.used);
}
if(is_reliable(c) && !timerisset(&c->rtrx_timeout)) {
uint8_t data[];
} *pkt;
- pkt = malloc(sizeof(pkt->hdr) + c->utcp->mtu);
+ pkt = malloc(c->utcp->mtu);
if(!pkt) {
return;
pkt->hdr.seq = c->snd.una;
pkt->hdr.ack = c->rcv.nxt;
pkt->hdr.ctl = ACK;
- uint32_t len = min(seqdiff(c->snd.last, c->snd.una), utcp->mtu);
+ uint32_t len = min(seqdiff(c->snd.last, c->snd.una), utcp->mss);
if(fin_wanted(c, c->snd.una + len)) {
len--;
struct {
struct hdr hdr;
uint8_t data[];
- } *pkt;
-
- pkt = malloc(sizeof(pkt->hdr) + c->utcp->mtu);
-
- if(!pkt) {
- return;
- }
+ } *pkt = c->utcp->pkt;
pkt->hdr.src = c->src;
pkt->hdr.dst = c->dst;
pkt->hdr.seq = c->snd.una;
pkt->hdr.ack = c->rcv.nxt;
pkt->hdr.ctl = ACK;
- uint32_t len = seqdiff(c->snd.last, c->snd.una);
-
- if(len > utcp->mtu) {
- len = utcp->mtu;
- }
+ uint32_t len = min(seqdiff(c->snd.last, c->snd.una), utcp->mss);
if(fin_wanted(c, c->snd.una + len)) {
len--;
pkt->hdr.ctl |= FIN;
}
- c->snd.nxt = c->snd.una + len;
-
// RFC 5681 slow start after timeout
- c->snd.ssthresh = max(c->snd.cwnd / 2, utcp->mtu * 2); // eq. 4
- c->snd.cwnd = utcp->mtu;
+ uint32_t flightsize = seqdiff(c->snd.nxt, c->snd.una);
+ c->snd.ssthresh = max(flightsize / 2, utcp->mss * 2); // eq. 4
+ c->snd.cwnd = utcp->mss;
debug_cwnd(c);
buffer_copy(&c->sndbuf, pkt->data, 0, len);
print_packet(c, "rtrx", pkt, sizeof(pkt->hdr) + len);
utcp->send(utcp, pkt, sizeof(pkt->hdr) + len);
+
+ c->snd.nxt = c->snd.una + len;
break;
case CLOSED:
}
c->rtt_start.tv_sec = 0; // invalidate RTT timer
+ c->dupack = 0; // cancel any ongoing fast recovery
cleanup:
- free(pkt);
+ return;
}
/* Update receive buffer and SACK entries after consuming data.
return;
}
- buffer_get(&c->rcvbuf, NULL, len);
+ buffer_discard(&c->rcvbuf, len);
for(int i = 0; i < NSACKS && c->sacks[i].len;) {
if(len < c->sacks[i].offset) {
// Otherwise, continue processing.
len = 0;
}
+ } else {
+#if UTCP_DEBUG
+ int32_t rcv_offset = seqdiff(hdr.seq, c->rcv.nxt);
+
+ if(rcv_offset) {
+ debug(c, "packet out of order, offset %u bytes", rcv_offset);
+ }
+
+ if(rcv_offset >= 0) {
+ c->rcv.nxt = hdr.seq + len;
+ }
+
+#endif
}
c->snd.wnd = hdr.wnd; // TODO: move below
#endif
if(data_acked) {
- buffer_get(&c->sndbuf, NULL, data_acked);
+ buffer_discard(&c->sndbuf, data_acked);
}
// Also advance snd.nxt if possible
// Increase the congestion window according to RFC 5681
if(c->snd.cwnd < c->snd.ssthresh) {
- c->snd.cwnd += min(advanced, utcp->mtu); // eq. 2
+ c->snd.cwnd += min(advanced, utcp->mss); // eq. 2
} else {
- c->snd.cwnd += max(1, (utcp->mtu * utcp->mtu) / c->snd.cwnd); // eq. 3
+ c->snd.cwnd += max(1, (utcp->mss * utcp->mss) / c->snd.cwnd); // eq. 3
}
if(c->snd.cwnd > c->sndbuf.maxsize) {
break;
}
} else {
- if(!len && is_reliable(c)) {
+ if(!len && is_reliable(c) && c->snd.una != c->snd.last) {
c->dupack++;
debug(c, "duplicate ACK %d\n", c->dupack);
if(c->dupack == 3) {
// RFC 5681 fast recovery
debug(c, "fast recovery started\n", c->dupack);
- c->snd.ssthresh = max(c->snd.cwnd / 2, utcp->mtu * 2); // eq. 4
- c->snd.cwnd = min(c->snd.ssthresh + 3 * utcp->mtu, c->sndbuf.maxsize);
+ uint32_t flightsize = seqdiff(c->snd.nxt, c->snd.una);
+ c->snd.ssthresh = max(flightsize / 2, utcp->mss * 2); // eq. 4
+ c->snd.cwnd = min(c->snd.ssthresh + 3 * utcp->mss, c->sndbuf.maxsize);
if(c->snd.cwnd > c->sndbuf.maxsize) {
c->snd.cwnd = c->sndbuf.maxsize;
fast_retransmit(c);
} else if(c->dupack > 3) {
- c->snd.cwnd += utcp->mtu;
+ c->snd.cwnd += utcp->mss;
if(c->snd.cwnd > c->sndbuf.maxsize) {
c->snd.cwnd = c->sndbuf.maxsize;
debug_cwnd(c);
}
+
+ // We got an ACK which indicates the other side did get one of our packets.
+ // Reset the retransmission timer to avoid going to slow start,
+ // but don't touch the connection timeout.
+ start_retransmit_timer(c);
}
}
utcp->pre_accept = pre_accept;
utcp->send = send;
utcp->priv = priv;
- utcp->mtu = DEFAULT_MTU;
+ utcp_set_mtu(utcp, DEFAULT_MTU);
utcp->timeout = DEFAULT_USER_TIMEOUT; // sec
utcp->rto = START_RTO; // usec
return utcp ? utcp->mtu : 0;
}
+uint16_t utcp_get_mss(struct utcp *utcp) {
+ return utcp ? utcp->mss : 0;
+}
+
void utcp_set_mtu(struct utcp *utcp, uint16_t mtu) {
- // TODO: handle overhead of the header
- if(utcp) {
- utcp->mtu = mtu;
+ if(!utcp) {
+ return;
+ }
+
+ if(mtu <= sizeof(struct hdr)) {
+ return;
}
+
+ if(mtu > utcp->mtu) {
+ char *new = realloc(utcp->pkt, mtu + sizeof(struct hdr));
+
+ if(!new) {
+ return;
+ }
+
+ utcp->pkt = new;
+ }
+
+ utcp->mtu = mtu;
+ utcp->mss = mtu - sizeof(struct hdr);
}
void utcp_reset_timers(struct utcp *utcp) {