X-Git-Url: http://git.meshlink.io/?a=blobdiff_plain;f=src%2Futcp.c;fp=src%2Futcp.c;h=0909a0101e196fdecc91b991948c4e5495b10380;hb=2178842d04c649ed22457dc0829153fd1092521c;hp=dc30fe69c603278549694cc8b6c24225ee28956e;hpb=0e03a8ad0de862ea431110d27d4659c082855f98;p=meshlink-tiny diff --git a/src/utcp.c b/src/utcp.c index dc30fe6..0909a01 100644 --- a/src/utcp.c +++ b/src/utcp.c @@ -186,274 +186,6 @@ static int32_t seqdiff(uint32_t a, uint32_t b) { return a - b; } -// Buffer functions -static bool buffer_wraps(struct buffer *buf) { - return buf->size - buf->offset < buf->used; -} - -static bool buffer_resize(struct buffer *buf, uint32_t newsize) { - assert(!buf->external); - - if(!newsize) { - free(buf->data); - buf->data = NULL; - buf->size = 0; - buf->offset = 0; - return true; - } - - char *newdata = realloc(buf->data, newsize); - - if(!newdata) { - return false; - } - - buf->data = newdata; - - if(buffer_wraps(buf)) { - // Shift the right part of the buffer until it hits the end of the new buffer. - // Old situation: - // [345......012] - // New situation: - // [345.........|........012] - uint32_t tailsize = buf->size - buf->offset; - uint32_t newoffset = newsize - tailsize; - memmove(buf->data + newoffset, buf->data + buf->offset, tailsize); - buf->offset = newoffset; - } - - buf->size = newsize; - return true; -} - -// Store data into the buffer -static ssize_t buffer_put_at(struct buffer *buf, size_t offset, const void *data, size_t len) { - debug(NULL, "buffer_put_at %lu %lu %lu\n", (unsigned long)buf->used, (unsigned long)offset, (unsigned long)len); - - // Ensure we don't store more than maxsize bytes in total - size_t required = offset + len; - - if(required > buf->maxsize) { - if(offset >= buf->maxsize) { - return 0; - } - - len = buf->maxsize - offset; - required = buf->maxsize; - } - - // Check if we need to resize the buffer - if(required > buf->size) { - size_t newsize = buf->size; - - if(!newsize) { - newsize = 4096; - } - - do { - newsize *= 2; - } while(newsize < required); - - if(newsize > buf->maxsize) { - newsize = buf->maxsize; - } - - if(!buffer_resize(buf, newsize)) { - return -1; - } - } - - uint32_t realoffset = buf->offset + offset; - - if(buf->size - buf->offset <= offset) { - // The offset wrapped - realoffset -= buf->size; - } - - if(buf->size - realoffset < len) { - // The new chunk of data must be wrapped - memcpy(buf->data + realoffset, data, buf->size - realoffset); - memcpy(buf->data, (char *)data + buf->size - realoffset, len - (buf->size - realoffset)); - } else { - memcpy(buf->data + realoffset, data, len); - } - - if(required > buf->used) { - buf->used = required; - } - - return len; -} - -static ssize_t buffer_put(struct buffer *buf, const void *data, size_t len) { - return buffer_put_at(buf, buf->used, data, len); -} - -// Copy data from the buffer without removing it. -static ssize_t buffer_copy(struct buffer *buf, void *data, size_t offset, size_t len) { - // Ensure we don't copy more than is actually stored in the buffer - if(offset >= buf->used) { - return 0; - } - - if(buf->used - offset < len) { - len = buf->used - offset; - } - - uint32_t realoffset = buf->offset + offset; - - if(buf->size - buf->offset <= offset) { - // The offset wrapped - realoffset -= buf->size; - } - - if(buf->size - realoffset < len) { - // The data is wrapped - memcpy(data, buf->data + realoffset, buf->size - realoffset); - memcpy((char *)data + buf->size - realoffset, buf->data, len - (buf->size - realoffset)); - } else { - memcpy(data, buf->data + realoffset, len); - } - - return len; -} - -// Discard data from the buffer. -static ssize_t buffer_discard(struct buffer *buf, size_t len) { - if(buf->used < len) { - len = buf->used; - } - - if(buf->size - buf->offset <= len) { - buf->offset -= buf->size; - } - - if(buf->used == len) { - buf->offset = 0; - } else { - buf->offset += len; - } - - buf->used -= len; - - return len; -} - -static void buffer_clear(struct buffer *buf) { - buf->used = 0; - buf->offset = 0; -} - -static bool buffer_set_size(struct buffer *buf, uint32_t minsize, uint32_t maxsize) { - if(maxsize < minsize) { - maxsize = minsize; - } - - buf->maxsize = maxsize; - - return buf->size >= minsize || buffer_resize(buf, minsize); -} - -static void buffer_transfer(struct buffer *buf, char *newdata, size_t newsize) { - if(buffer_wraps(buf)) { - // Old situation: - // [345......012] - // New situation: - // [012345......] - uint32_t tailsize = buf->size - buf->offset; - memcpy(newdata, buf->data + buf->offset, tailsize); - memcpy(newdata + tailsize, buf->data, buf->used - tailsize); - } else { - // Old situation: - // [....012345..] - // New situation: - // [012345......] - memcpy(newdata, buf->data + buf->offset, buf->used); - } - - buf->offset = 0; - buf->size = newsize; -} - -static void set_buffer_storage(struct buffer *buf, char *data, size_t size) { - if(size > UINT32_MAX) { - size = UINT32_MAX; - } - - buf->maxsize = size; - - if(data) { - if(buf->external) { - // Don't allow resizing an external buffer - abort(); - } - - if(size < buf->used) { - // Ignore requests for an external buffer if we are already using more than it can store - return; - } - - // Transition from internal to external buffer - buffer_transfer(buf, data, size); - free(buf->data); - buf->data = data; - buf->external = true; - } else if(buf->external) { - // Transition from external to internal buf - size_t minsize = buf->used <= DEFAULT_SNDBUFSIZE ? DEFAULT_SNDBUFSIZE : buf->used; - - if(minsize) { - data = malloc(minsize); - - if(!data) { - // Cannot handle this - abort(); - } - - buffer_transfer(buf, data, minsize); - buf->data = data; - } else { - buf->data = NULL; - buf->size = 0; - } - - buf->external = false; - } else { - // Don't do anything if the buffer wraps - if(buffer_wraps(buf)) { - return; - } - - // Realloc internal storage - size_t minsize = max(DEFAULT_SNDBUFSIZE, buf->offset + buf->used); - - if(minsize) { - data = realloc(buf->data, minsize); - - if(data) { - buf->data = data; - buf->size = minsize; - } - } else { - free(buf->data); - buf->data = NULL; - buf->size = 0; - } - } -} - -static void buffer_exit(struct buffer *buf) { - if(!buf->external) { - free(buf->data); - } - - memset(buf, 0, sizeof(*buf)); -} - -static uint32_t buffer_free(const struct buffer *buf) { - return buf->maxsize > buf->used ? buf->maxsize - buf->used : 0; -} - // Connections are stored in a sorted list. // This gives O(log(N)) lookup time, O(N log(N)) insertion time and O(N) deletion time. @@ -498,8 +230,6 @@ static void free_connection(struct utcp_connection *c) { memmove(cp, cp + 1, (utcp->nconnections - i - 1) * sizeof(*cp)); utcp->nconnections--; - buffer_exit(&c->rcvbuf); - buffer_exit(&c->sndbuf); free(c); } @@ -548,17 +278,6 @@ static struct utcp_connection *allocate_connection(struct utcp *utcp, uint16_t s return NULL; } - if(!buffer_set_size(&c->sndbuf, DEFAULT_SNDBUFSIZE, DEFAULT_MAXSNDBUFSIZE)) { - free(c); - return NULL; - } - - if(!buffer_set_size(&c->rcvbuf, DEFAULT_RCVBUFSIZE, DEFAULT_MAXRCVBUFSIZE)) { - buffer_exit(&c->sndbuf); - free(c); - return NULL; - } - // Fill in the details c->src = src; @@ -666,7 +385,7 @@ struct utcp_connection *utcp_connect_ex(struct utcp *utcp, uint16_t dst, utcp_re pkt.hdr.dst = c->dst; pkt.hdr.seq = c->snd.iss; pkt.hdr.ack = 0; - pkt.hdr.wnd = c->rcvbuf.maxsize; + pkt.hdr.wnd = c->utcp->mtu; pkt.hdr.ctl = SYN; pkt.hdr.aux = 0x0101; pkt.init[0] = 1; @@ -699,28 +418,7 @@ void utcp_accept(struct utcp_connection *c, utcp_recv_t recv, void *priv) { set_state(c, ESTABLISHED); } -static void ack(struct utcp_connection *c, bool sendatleastone) { - int32_t left = seqdiff(c->snd.last, c->snd.nxt); - int32_t cwndleft = MAX_UNRELIABLE_SIZE; - - assert(left >= 0); - - if(cwndleft <= 0) { - left = 0; - } else if(cwndleft < left) { - left = cwndleft; - - if(!sendatleastone || cwndleft > c->utcp->mss) { - left -= left % c->utcp->mss; - } - } - - debug(c, "cwndleft %d left %d\n", cwndleft, left); - - if(!left && !sendatleastone) { - return; - } - +static void ack(struct utcp_connection *c, const void *data, size_t len) { struct { struct hdr hdr; uint8_t data[]; @@ -733,34 +431,31 @@ static void ack(struct utcp_connection *c, bool sendatleastone) { pkt->hdr.ctl = ACK; pkt->hdr.aux = 0; - do { - uint32_t seglen = left > c->utcp->mss ? c->utcp->mss : left; - pkt->hdr.seq = c->snd.nxt; - - buffer_copy(&c->sndbuf, pkt->data, seqdiff(c->snd.nxt, c->snd.una), seglen); + uint32_t seglen = len; + pkt->hdr.seq = c->snd.nxt; - c->snd.nxt += seglen; - left -= seglen; + c->snd.nxt += seglen; - if(seglen && fin_wanted(c, c->snd.nxt)) { - seglen--; - pkt->hdr.ctl |= FIN; - } + if(fin_wanted(c, c->snd.nxt)) { + pkt->hdr.ctl |= FIN; + } - if(!c->rtt_start.tv_sec) { - // Start RTT measurement - clock_gettime(UTCP_CLOCK, &c->rtt_start); - c->rtt_seq = pkt->hdr.seq + seglen; - debug(c, "starting RTT measurement, expecting ack %u\n", c->rtt_seq); - } + if(data && len) { + assert(len <= c->utcp->mtu); + memcpy(pkt->data, data, len); + } else { + assert(!data && !len); + } - print_packet(c, "send", pkt, sizeof(pkt->hdr) + seglen); - c->utcp->send(c->utcp, pkt, sizeof(pkt->hdr) + seglen); + if(!c->rtt_start.tv_sec) { + // Start RTT measurement + clock_gettime(UTCP_CLOCK, &c->rtt_start); + c->rtt_seq = pkt->hdr.seq + seglen; + debug(c, "starting RTT measurement, expecting ack %u\n", c->rtt_seq); + } - if(left) { - pkt->hdr.wnd += seglen; - } - } while(left); + print_packet(c, "send", pkt, sizeof(pkt->hdr) + seglen); + c->utcp->send(c->utcp, pkt, sizeof(pkt->hdr) + seglen); } ssize_t utcp_send(struct utcp_connection *c, const void *data, size_t len) { @@ -804,15 +499,9 @@ ssize_t utcp_send(struct utcp_connection *c, const void *data, size_t len) { return -1; } - // Add data to send buffer. - - if(c->state != SYN_SENT && c->state != SYN_RECEIVED) { - if(len > MAX_UNRELIABLE_SIZE || buffer_put(&c->sndbuf, data, len) != (ssize_t)len) { - errno = EMSGSIZE; - return -1; - } - } else { - return 0; + if(len > MAX_UNRELIABLE_SIZE || len > c->utcp->mtu) { + errno = EMSGSIZE; + return -1; } if(len <= 0) { @@ -827,10 +516,9 @@ ssize_t utcp_send(struct utcp_connection *c, const void *data, size_t len) { return len; } - ack(c, false); + ack(c, data, len); c->snd.una = c->snd.nxt = c->snd.last; - buffer_discard(&c->sndbuf, c->sndbuf.used); return len; } @@ -857,7 +545,7 @@ static void retransmit(struct utcp_connection *c) { pkt->hdr.src = c->src; pkt->hdr.dst = c->dst; - pkt->hdr.wnd = c->rcvbuf.maxsize; + pkt->hdr.wnd = c->utcp->mtu; pkt->hdr.aux = 0; switch(c->state) { @@ -1048,8 +736,6 @@ ssize_t utcp_recv(struct utcp *utcp, const void *data, size_t len) { ptr += 2; } - bool has_data = len || (hdr.ctl & (SYN | FIN)); - // Is it for a new connection? if(!c) { @@ -1104,7 +790,7 @@ synack: pkt.hdr.dst = c->dst; pkt.hdr.ack = c->rcv.irs + 1; pkt.hdr.seq = c->snd.iss; - pkt.hdr.wnd = c->rcvbuf.maxsize; + pkt.hdr.wnd = c->utcp->mtu; pkt.hdr.ctl = SYN | ACK; if(init) { @@ -1224,8 +910,6 @@ synack: // The peer has aborted our connection. set_state(c, CLOSED); errno = ECONNRESET; - buffer_clear(&c->sndbuf); - buffer_clear(&c->rcvbuf); if(c->recv) { c->recv(c, NULL, 0); @@ -1310,10 +994,6 @@ synack: assert(data_acked <= bufused); #endif - if(data_acked) { - buffer_discard(&c->sndbuf, data_acked); - } - // Also advance snd.nxt if possible if(seqdiff(c->snd.nxt, hdr.ack) < 0) { c->snd.nxt = hdr.ack; @@ -1337,8 +1017,8 @@ synack: c->snd.cwnd += max(1, (utcp->mss * utcp->mss) / c->snd.cwnd); // eq. 3 } - if(c->snd.cwnd > c->sndbuf.maxsize) { - c->snd.cwnd = c->sndbuf.maxsize; + if(c->snd.cwnd > c->utcp->mtu) { + c->snd.cwnd = c->utcp->mtu; } debug_cwnd(c); @@ -1538,7 +1218,7 @@ skip_ack: // -> sendatleastone = false if(hdr.ctl & SYN || hdr.ctl & FIN) { - ack(c, has_data); + ack(c, NULL, 0); } return 0; @@ -1630,7 +1310,7 @@ int utcp_shutdown(struct utcp_connection *c, int dir) { c->snd.last++; - ack(c, false); + ack(c, NULL, 0); if(!timespec_isset(&c->rtrx_timeout)) { start_retransmit_timer(c); @@ -1651,9 +1331,6 @@ static bool reset_connection(struct utcp_connection *c) { return false; } - buffer_clear(&c->sndbuf); - buffer_clear(&c->rcvbuf); - switch(c->state) { case CLOSED: return true; @@ -1693,9 +1370,6 @@ static bool reset_connection(struct utcp_connection *c) { } static void set_reapable(struct utcp_connection *c) { - set_buffer_storage(&c->sndbuf, NULL, min(c->sndbuf.maxsize, DEFAULT_MAXSNDBUFSIZE)); - set_buffer_storage(&c->rcvbuf, NULL, min(c->rcvbuf.maxsize, DEFAULT_MAXRCVBUFSIZE)); - c->recv = NULL; c->reapable = true; } @@ -1775,8 +1449,6 @@ struct timespec utcp_timeout(struct utcp *utcp) { if(timespec_isset(&c->conn_timeout) && timespec_lt(&c->conn_timeout, &now)) { errno = ETIMEDOUT; c->state = CLOSED; - buffer_clear(&c->sndbuf); - buffer_clear(&c->rcvbuf); if(c->recv) { c->recv(c, NULL, 0); @@ -1862,16 +1534,11 @@ void utcp_exit(struct utcp *utcp) { struct utcp_connection *c = utcp->connections[i]; if(!c->reapable) { - buffer_clear(&c->sndbuf); - buffer_clear(&c->rcvbuf); - if(c->recv) { c->recv(c, NULL, 0); } } - buffer_exit(&c->rcvbuf); - buffer_exit(&c->sndbuf); free(c); } @@ -1957,63 +1624,6 @@ void utcp_set_user_timeout(struct utcp *u, int timeout) { } } -size_t utcp_get_sndbuf(struct utcp_connection *c) { - return c ? c->sndbuf.maxsize : 0; -} - -size_t utcp_get_sndbuf_free(struct utcp_connection *c) { - if(!c) { - return 0; - } - - switch(c->state) { - case SYN_SENT: - case SYN_RECEIVED: - case ESTABLISHED: - case CLOSE_WAIT: - return buffer_free(&c->sndbuf); - - default: - return 0; - } -} - -void utcp_set_sndbuf(struct utcp_connection *c, void *data, size_t size) { - if(!c) { - return; - } - - set_buffer_storage(&c->sndbuf, data, size); -} - -size_t utcp_get_rcvbuf(struct utcp_connection *c) { - return c ? c->rcvbuf.maxsize : 0; -} - -size_t utcp_get_rcvbuf_free(struct utcp_connection *c) { - if(c && (c->state == ESTABLISHED || c->state == CLOSE_WAIT)) { - return buffer_free(&c->rcvbuf); - } else { - return 0; - } -} - -void utcp_set_rcvbuf(struct utcp_connection *c, void *data, size_t size) { - if(!c) { - return; - } - - set_buffer_storage(&c->rcvbuf, data, size); -} - -size_t utcp_get_sendq(struct utcp_connection *c) { - return c->sndbuf.used; -} - -size_t utcp_get_recvq(struct utcp_connection *c) { - return c->rcvbuf.used; -} - bool utcp_get_nodelay(struct utcp_connection *c) { return c ? c->nodelay : false; } @@ -2034,10 +1644,6 @@ void utcp_set_keepalive(struct utcp_connection *c, bool keepalive) { } } -size_t utcp_get_outq(struct utcp_connection *c) { - return c ? seqdiff(c->snd.nxt, c->snd.una) : 0; -} - void utcp_set_recv_cb(struct utcp_connection *c, utcp_recv_t recv) { if(c) { c->recv = recv;