X-Git-Url: http://git.meshlink.io/?a=blobdiff_plain;f=utcp.c;h=0d102b4c7bcd063ff388209e7e8c1b9ff4c50fd3;hb=0c3abf9e3538c482639ed32b0e85104495db5b4f;hp=0b36de15263b8b005c0625f866e901054432b5ba;hpb=bf55733700d946f678947a6caaa782b56669a3f1;p=utcp diff --git a/utcp.c b/utcp.c index 0b36de1..0d102b4 100644 --- a/utcp.c +++ b/utcp.c @@ -43,6 +43,14 @@ #undef poll #endif +#ifndef UTCP_CLOCK +#if defined(CLOCK_MONOTONIC_RAW) && defined(__x86_64__) +#define UTCP_CLOCK CLOCK_MONOTONIC_RAW +#else +#define UTCP_CLOCK CLOCK_MONOTONIC +#endif +#endif + static void timespec_sub(const struct timespec *a, const struct timespec *b, struct timespec *r) { r->tv_sec = a->tv_sec - b->tv_sec; r->tv_nsec = a->tv_nsec - b->tv_nsec; @@ -90,14 +98,6 @@ static inline size_t max(size_t a, size_t b) { #define UTCP_DEBUG_DATALEN 20 #endif -#ifndef UTCP_CLOCK -#if defined(CLOCK_MONOTONIC_RAW) && defined(__x86_64__) -#define UTCP_CLOCK CLOCK_MONOTONIC_RAW -#else -#define UTCP_CLOCK CLOCK_MONOTONIC -#endif -#endif - static void debug(struct utcp_connection *c, const char *format, ...) { struct timespec tv; char buf[1024]; @@ -220,7 +220,7 @@ static bool buffer_resize(struct buffer *buf, uint32_t newsize) { // [345.........|........012] uint32_t tailsize = buf->size - buf->offset; uint32_t newoffset = newsize - tailsize; - memmove(buf + newoffset, buf + buf->offset, tailsize); + memmove(buf->data + newoffset, buf->data + buf->offset, tailsize); buf->offset = newoffset; } @@ -320,6 +320,44 @@ static ssize_t buffer_copy(struct buffer *buf, void *data, size_t offset, size_t return len; } +// Copy data from the buffer without removing it. +static ssize_t buffer_call(struct buffer *buf, utcp_recv_t cb, void *arg, size_t offset, size_t len) { + // Ensure we don't copy more than is actually stored in the buffer + if(offset >= buf->used) { + return 0; + } + + if(buf->used - offset < len) { + len = buf->used - offset; + } + + uint32_t realoffset = buf->offset + offset; + + if(buf->size - buf->offset < offset) { + // The offset wrapped + realoffset -= buf->size; + } + + if(buf->size - realoffset < len) { + // The data is wrapped + ssize_t rx1 = cb(arg, buf->data + realoffset, buf->size - realoffset); + + if(rx1 < buf->size - realoffset) { + return rx1; + } + + ssize_t rx2 = cb(arg, buf->data, len - (buf->size - realoffset)); + + if(rx2 < 0) { + return rx2; + } else { + return rx1 + rx2; + } + } else { + return cb(arg, buf->data + realoffset, len); + } +} + // Discard data from the buffer. static ssize_t buffer_discard(struct buffer *buf, size_t len) { if(buf->used < len) { @@ -749,6 +787,7 @@ ssize_t utcp_send(struct utcp_connection *c, const void *data, size_t len) { if(!is_reliable(c)) { c->snd.una = c->snd.nxt = c->snd.last; buffer_discard(&c->sndbuf, c->sndbuf.used); + c->do_poll = true; } if(is_reliable(c) && !timespec_isset(&c->rtrx_timeout)) { @@ -1023,23 +1062,32 @@ static void handle_out_of_order(struct utcp_connection *c, uint32_t offset, cons } static void handle_in_order(struct utcp_connection *c, const void *data, size_t len) { - // Check if we can process out-of-order data now. - if(c->sacks[0].len && len >= c->sacks[0].offset) { // TODO: handle overlap with second SACK - debug(c, "incoming packet len %lu connected with SACK at %u\n", (unsigned long)len, c->sacks[0].offset); - buffer_put_at(&c->rcvbuf, 0, data, len); // TODO: handle return value - len = max(len, c->sacks[0].offset + c->sacks[0].len); - data = c->rcvbuf.data; - } - if(c->recv) { ssize_t rxd = c->recv(c, data, len); - if(rxd < 0 || (size_t)rxd != len) { + if(rxd != (ssize_t)len) { // TODO: handle the application not accepting all data. abort(); } } + // Check if we can process out-of-order data now. + if(c->sacks[0].len && len >= c->sacks[0].offset) { + debug(c, "incoming packet len %lu connected with SACK at %u\n", (unsigned long)len, c->sacks[0].offset); + + if(len < c->sacks[0].offset + c->sacks[0].len) { + size_t offset = len; + len = c->sacks[0].offset + c->sacks[0].len; + size_t remainder = len - offset; + ssize_t rxd = buffer_call(&c->rcvbuf, c->recv, c, offset, remainder); + + if(rxd != (ssize_t)remainder) { + // TODO: handle the application not accepting all data. + abort(); + } + } + } + if(c->rcvbuf.used) { sack_consume(c, len); } @@ -1484,6 +1532,7 @@ synack: if(data_acked) { buffer_discard(&c->sndbuf, data_acked); + c->do_poll = true; } // Also advance snd.nxt if possible @@ -2001,8 +2050,9 @@ struct timespec utcp_timeout(struct utcp *utcp) { } if(c->poll) { - if((c->state == ESTABLISHED || c->state == CLOSE_WAIT)) { - uint32_t len = buffer_free(&c->sndbuf); + if((c->state == ESTABLISHED || c->state == CLOSE_WAIT) && c->do_poll) { + c->do_poll = false; + uint32_t len = buffer_free(&c->sndbuf); if(len) { c->poll(c, len); @@ -2205,6 +2255,8 @@ void utcp_set_sndbuf(struct utcp_connection *c, size_t size) { if(c->sndbuf.maxsize != size) { c->sndbuf.maxsize = -1; } + + c->do_poll = buffer_free(&c->sndbuf); } size_t utcp_get_rcvbuf(struct utcp_connection *c) { @@ -2272,6 +2324,7 @@ void utcp_set_recv_cb(struct utcp_connection *c, utcp_recv_t recv) { void utcp_set_poll_cb(struct utcp_connection *c, utcp_poll_t poll) { if(c) { c->poll = poll; + c->do_poll = buffer_free(&c->sndbuf); } }