From: Guus Sliepen Date: Thu, 18 Feb 2021 19:57:23 +0000 (+0100) Subject: Allow the application to provide channel send/receive buffers. X-Git-Url: https://git.meshlink.io/?a=commitdiff_plain;h=0fdc99b6a501992b8c1dea2d1a909363b5564d0d;p=meshlink Allow the application to provide channel send/receive buffers. This adds two new functions to allow the application to allocate send and/or receive buffers for channels: meshlink_set_channel_rcvbuf_storage(mesh, channel, buf, size) meshlink_set_channel_sndbuf_storage(mesh, channel, buf, size) --- diff --git a/src/meshlink++.h b/src/meshlink++.h index b7ba5b91..51bc5e37 100644 --- a/src/meshlink++.h +++ b/src/meshlink++.h @@ -898,6 +898,30 @@ public: meshlink_set_channel_flags(handle, channel, flags); } + /// Set the send buffer storage of a channel. + /** This function provides MeshLink with a send buffer allocated by the application. + * + * @param channel A handle for the channel. + * @param buf A pointer to the start of the buffer. + * If a NULL pointer is given, MeshLink will use its own internal buffer again. + * @param size The size of the buffer. + */ + void set_channel_sndbuf_storage(channel *channel, void *buf, size_t size) { + meshlink_set_channel_sndbuf_storage(handle, channel, buf, size); + } + + /// Set the receive buffer storage of a channel. + /** This function provides MeshLink with a receive buffer allocated by the application. + * + * @param channel A handle for the channel. + * @param buf A pointer to the start of the buffer. + * If a NULL pointer is given, MeshLink will use its own internal buffer again. + * @param size The size of the buffer. + */ + void set_channel_rcvbuf_storage(channel *channel, void *buf, size_t size) { + meshlink_set_channel_rcvbuf_storage(handle, channel, buf, size); + } + /// Set the connection timeout used for channels to the given node. /** This sets the timeout after which unresponsive channels will be reported as closed. * The timeout is set for all current and future channels to the given node. diff --git a/src/meshlink.c b/src/meshlink.c index fd8e2f98..1540df37 100644 --- a/src/meshlink.c +++ b/src/meshlink.c @@ -4055,6 +4055,14 @@ void meshlink_set_channel_accept_cb(meshlink_handle_t *mesh, meshlink_channel_ac } void meshlink_set_channel_sndbuf(meshlink_handle_t *mesh, meshlink_channel_t *channel, size_t size) { + meshlink_set_channel_sndbuf_storage(mesh, channel, NULL, size); +} + +void meshlink_set_channel_rcvbuf(meshlink_handle_t *mesh, meshlink_channel_t *channel, size_t size) { + meshlink_set_channel_rcvbuf_storage(mesh, channel, NULL, size); +} + +void meshlink_set_channel_sndbuf_storage(meshlink_handle_t *mesh, meshlink_channel_t *channel, void *buf, size_t size) { if(!mesh || !channel) { meshlink_errno = MESHLINK_EINVAL; return; @@ -4064,11 +4072,11 @@ void meshlink_set_channel_sndbuf(meshlink_handle_t *mesh, meshlink_channel_t *ch abort(); } - utcp_set_sndbuf(channel->c, size); + utcp_set_sndbuf(channel->c, buf, size); pthread_mutex_unlock(&mesh->mutex); } -void meshlink_set_channel_rcvbuf(meshlink_handle_t *mesh, meshlink_channel_t *channel, size_t size) { +void meshlink_set_channel_rcvbuf_storage(meshlink_handle_t *mesh, meshlink_channel_t *channel, void *buf, size_t size) { if(!mesh || !channel) { meshlink_errno = MESHLINK_EINVAL; return; @@ -4078,7 +4086,7 @@ void meshlink_set_channel_rcvbuf(meshlink_handle_t *mesh, meshlink_channel_t *ch abort(); } - utcp_set_rcvbuf(channel->c, size); + utcp_set_rcvbuf(channel->c, buf, size); pthread_mutex_unlock(&mesh->mutex); } diff --git a/src/meshlink.h b/src/meshlink.h index b3ccf8f0..822e314b 100644 --- a/src/meshlink.h +++ b/src/meshlink.h @@ -1365,7 +1365,6 @@ void meshlink_set_channel_poll_cb(struct meshlink_handle *mesh, struct meshlink_ * @param mesh A handle which represents an instance of MeshLink. * @param channel A handle for the channel. * @param size The desired size for the send buffer. - * If a NULL pointer is given, the callback will be disabled. */ void meshlink_set_channel_sndbuf(struct meshlink_handle *mesh, struct meshlink_channel *channel, size_t size); @@ -1377,10 +1376,35 @@ void meshlink_set_channel_sndbuf(struct meshlink_handle *mesh, struct meshlink_c * @param mesh A handle which represents an instance of MeshLink. * @param channel A handle for the channel. * @param size The desired size for the send buffer. - * If a NULL pointer is given, the callback will be disabled. */ void meshlink_set_channel_rcvbuf(struct meshlink_handle *mesh, struct meshlink_channel *channel, size_t size); +/// Set the send buffer storage of a channel. +/** This function provides MeshLink with a send buffer allocated by the application. + * The buffer must be valid until the channel is closed or until this function is called again with a NULL pointer for @a buf. + * + * \memberof meshlink_channel + * @param mesh A handle which represents an instance of MeshLink. + * @param channel A handle for the channel. + * @param buf A pointer to the start of the buffer. + * If a NULL pointer is given, MeshLink will use its own internal buffer again. + * @param size The size of the buffer. + */ +void meshlink_set_channel_sndbuf_storage(struct meshlink_handle *mesh, struct meshlink_channel *channel, void *buf, size_t size); + +/// Set the receive buffer storage of a channel. +/** This function provides MeshLink with a receive buffer allocated by the application. + * The buffer must be valid until the channel is closed or until this function is called again with a NULL pointer for @a buf. + * + * \memberof meshlink_channel + * @param mesh A handle which represents an instance of MeshLink. + * @param channel A handle for the channel. + * @param buf A pointer to the start of the buffer. + * If a NULL pointer is given, MeshLink will use its own internal buffer again. + * @param size The size of the buffer. + */ +void meshlink_set_channel_rcvbuf_storage(struct meshlink_handle *mesh, struct meshlink_channel *channel, void *buf, size_t size); + /// Set the flags of a channel. /** This function allows changing some of the channel flags. * Currently only MESHLINK_CHANNEL_NO_PARTIAL and MESHLINK_CHANNEL_DROP_LATE are supported, other flags are ignored. diff --git a/src/meshlink.sym b/src/meshlink.sym index faa3517d..6c29df36 100644 --- a/src/meshlink.sym +++ b/src/meshlink.sym @@ -76,8 +76,10 @@ meshlink_set_channel_flags meshlink_set_channel_listen_cb meshlink_set_channel_poll_cb meshlink_set_channel_rcvbuf +meshlink_set_channel_rcvbuf_storage meshlink_set_channel_receive_cb meshlink_set_channel_sndbuf +meshlink_set_channel_sndbuf_storage meshlink_set_connection_try_cb meshlink_set_default_blacklist meshlink_set_dev_class_fast_retry_period diff --git a/src/utcp-test.c b/src/utcp-test.c index 564bfba8..a7c0534f 100644 --- a/src/utcp-test.c +++ b/src/utcp-test.c @@ -105,8 +105,8 @@ static void do_accept(struct utcp_connection *nc, uint16_t port) { c = nc; if(bufsize) { - utcp_set_sndbuf(c, bufsize); - utcp_set_rcvbuf(c, bufsize); + utcp_set_sndbuf(c, NULL, bufsize); + utcp_set_rcvbuf(c, NULL, bufsize); } utcp_set_accept_cb(c->utcp, NULL, NULL); @@ -299,8 +299,8 @@ int main(int argc, char *argv[]) { c = utcp_connect_ex(u, 1, do_recv, NULL, flags); if(bufsize) { - utcp_set_sndbuf(c, bufsize); - utcp_set_rcvbuf(c, bufsize); + utcp_set_sndbuf(c, NULL, bufsize); + utcp_set_rcvbuf(c, NULL, bufsize); } } diff --git a/src/utcp.c b/src/utcp.c index f237270f..9e2229ad 100644 --- a/src/utcp.c +++ b/src/utcp.c @@ -396,7 +396,10 @@ static bool buffer_set_size(struct buffer *buf, uint32_t minsize, uint32_t maxsi } static void buffer_exit(struct buffer *buf) { - free(buf->data); + if(!buf->external) { + free(buf->data); + } + memset(buf, 0, sizeof(*buf)); } @@ -2315,16 +2318,72 @@ size_t utcp_get_sndbuf_free(struct utcp_connection *c) { } } -void utcp_set_sndbuf(struct utcp_connection *c, size_t size) { - if(!c) { - return; +static void buffer_transfer(struct buffer *buf, char *newdata, size_t newsize) { + if(buffer_wraps(buf)) { + // Old situation: + // [345......012] + // New situation: + // [012345......] + uint32_t tailsize = buf->size - buf->offset; + memcpy(newdata, buf->data + buf->offset, tailsize); + memcpy(newdata + tailsize, buf->data, buf->used - buf->offset); + } else { + // Old situation: + // [....012345..] + // New situation: + // [012345......] + memcpy(newdata, buf->data + buf->offset, buf->used); + } + + buf->offset = 0; + buf->size = newsize; +} + +static void set_buffer_storage(struct buffer *buf, char *data, size_t size) { + if(size > UINT32_MAX) { + size = UINT32_MAX; } - c->sndbuf.maxsize = size; + buf->maxsize = size; - if(c->sndbuf.maxsize != size) { - c->sndbuf.maxsize = -1; + if(data) { + if(buf->external) { + // Don't allow resizing an external buffer + abort(); + } + + if(size < buf->used) { + // Ignore requests for an external buffer if we are already using more than it can store + return; + } + + // Transition from internal to external buffer + buffer_transfer(buf, data, size); + free(buf->data); + buf->data = data; + buf->external = true; + } else if(buf->external) { + // Transition from external to internal buf + size_t minsize = buf->used < DEFAULT_SNDBUFSIZE ? DEFAULT_SNDBUFSIZE : buf->used; + data = malloc(minsize); + + if(!data) { + // Cannot handle this + abort(); + } + + buffer_transfer(buf, data, minsize); + buf->data = data; + buf->external = false; } +} + +void utcp_set_sndbuf(struct utcp_connection *c, void *data, size_t size) { + if(!c) { + return; + } + + set_buffer_storage(&c->sndbuf, data, size); c->do_poll = is_reliable(c) && buffer_free(&c->sndbuf); } @@ -2341,16 +2400,12 @@ size_t utcp_get_rcvbuf_free(struct utcp_connection *c) { } } -void utcp_set_rcvbuf(struct utcp_connection *c, size_t size) { +void utcp_set_rcvbuf(struct utcp_connection *c, void *data, size_t size) { if(!c) { return; } - c->rcvbuf.maxsize = size; - - if(c->rcvbuf.maxsize != size) { - c->rcvbuf.maxsize = -1; - } + set_buffer_storage(&c->rcvbuf, data, size); } size_t utcp_get_sendq(struct utcp_connection *c) { diff --git a/src/utcp.h b/src/utcp.h index 9921a375..12964152 100644 --- a/src/utcp.h +++ b/src/utcp.h @@ -99,11 +99,11 @@ void utcp_set_retransmit_cb(struct utcp *utcp, utcp_retransmit_t retransmit); // Per-socket options size_t utcp_get_sndbuf(struct utcp_connection *connection); -void utcp_set_sndbuf(struct utcp_connection *connection, size_t size); +void utcp_set_sndbuf(struct utcp_connection *connection, void *buf, size_t size); size_t utcp_get_sndbuf_free(struct utcp_connection *connection); size_t utcp_get_rcvbuf(struct utcp_connection *connection); -void utcp_set_rcvbuf(struct utcp_connection *connection, size_t size); +void utcp_set_rcvbuf(struct utcp_connection *connection, void *buf, size_t size); size_t utcp_get_rcvbuf_free(struct utcp_connection *connection); size_t utcp_get_sendq(struct utcp_connection *connection); diff --git a/src/utcp_priv.h b/src/utcp_priv.h index c9519458..78cc9f95 100644 --- a/src/utcp_priv.h +++ b/src/utcp_priv.h @@ -95,6 +95,7 @@ struct buffer { uint32_t used; uint32_t size; uint32_t maxsize; + bool external; }; struct sack { diff --git a/test/Makefile.am b/test/Makefile.am index e398eeb3..4329f189 100644 --- a/test/Makefile.am +++ b/test/Makefile.am @@ -6,6 +6,7 @@ TESTS = \ channels-aio \ channels-aio-cornercases \ channels-aio-fd \ + channels-buffer-storage \ channels-cornercases \ channels-failure \ channels-fork \ @@ -46,6 +47,7 @@ check_PROGRAMS = \ channels-aio \ channels-aio-cornercases \ channels-aio-fd \ + channels-buffer-storage \ channels-cornercases \ channels-failure \ channels-fork \ @@ -93,6 +95,9 @@ channels_aio_cornercases_LDADD = $(top_builddir)/src/libmeshlink.la channels_aio_fd_SOURCES = channels-aio-fd.c utils.c utils.h channels_aio_fd_LDADD = $(top_builddir)/src/libmeshlink.la +channels_buffer_storage_SOURCES = channels-buffer-storage.c utils.c utils.h +channels_buffer_storage_LDADD = $(top_builddir)/src/libmeshlink.la + channels_no_partial_SOURCES = channels-no-partial.c utils.c utils.h channels_no_partial_LDADD = $(top_builddir)/src/libmeshlink.la diff --git a/test/channels-buffer-storage.c b/test/channels-buffer-storage.c new file mode 100644 index 00000000..af7b25b8 --- /dev/null +++ b/test/channels-buffer-storage.c @@ -0,0 +1,165 @@ +#ifdef NDEBUG +#undef NDEBUG +#endif + +#include +#include +#include +#include +#include +#include + +#include "utils.h" +#include "../src/meshlink.h" + +static struct sync_flag b_responded; +static struct sync_flag aio_finished; + +static const size_t size = 25000000; // size of data to transfer + +static void a_receive_cb(meshlink_handle_t *mesh, meshlink_channel_t *channel, const void *data, size_t len) { + (void)mesh; + (void)channel; + + if(len == 5 && !memcmp(data, "Hello", 5)) { + set_sync_flag(&b_responded, true); + } +} + +static void b_receive_cb(meshlink_handle_t *mesh, meshlink_channel_t *channel, const void *data, size_t len) { + assert(meshlink_channel_send(mesh, channel, data, len) == (ssize_t)len); +} + +static bool reject_cb(meshlink_handle_t *mesh, meshlink_channel_t *channel, uint16_t port, const void *data, size_t len) { + (void)mesh; + (void)channel; + (void)port; + (void)data; + (void)len; + + return false; +} + +static bool accept_cb(meshlink_handle_t *mesh, meshlink_channel_t *channel, uint16_t port, const void *data, size_t len) { + printf("accept_cb: (from %s on port %u) ", channel->node->name, (unsigned int)port); + + if(data) { + fwrite(data, 1, len, stdout); + } + + printf("\n"); + + if(port != 7) { + return false; + } + + meshlink_set_channel_receive_cb(mesh, channel, b_receive_cb); + meshlink_set_channel_sndbuf(mesh, channel, size); + + if(data) { + b_receive_cb(mesh, channel, data, len); + } + + return true; +} + +static void poll_cb(meshlink_handle_t *mesh, meshlink_channel_t *channel, size_t len) { + (void)len; + + meshlink_set_channel_poll_cb(mesh, channel, NULL); + + assert(meshlink_channel_send(mesh, channel, "Hello", 5) == 5); +} + +static void aio_cb(meshlink_handle_t *mesh, meshlink_channel_t *channel, const void *data, size_t len, void *priv) { + (void)mesh; + (void)channel; + (void)data; + (void)len; + (void)priv; + + set_sync_flag(&aio_finished, true); +} + +int main(void) { + init_sync_flag(&b_responded); + init_sync_flag(&aio_finished); + + meshlink_set_log_cb(NULL, MESHLINK_INFO, log_cb); + + // Open two new meshlink instance. + + meshlink_handle_t *mesh_a, *mesh_b; + open_meshlink_pair(&mesh_a, &mesh_b, "channels-buffer-storage"); + + // Set the callbacks. + + meshlink_set_channel_accept_cb(mesh_a, reject_cb); + meshlink_set_channel_accept_cb(mesh_b, accept_cb); + + // Start both instances + + start_meshlink_pair(mesh_a, mesh_b); + + // Open a channel from a to b. + + meshlink_node_t *b = meshlink_get_node(mesh_a, "b"); + assert(b); + + meshlink_channel_t *channel = meshlink_channel_open(mesh_a, b, 7, a_receive_cb, NULL, 0); + assert(channel); + + size_t buf_size = 1024 * 1024; + char *sndbuf = malloc(1024 * 1024); + assert(sndbuf); + char *rcvbuf = malloc(1024 * 1024); + assert(rcvbuf); + + // Set external buffers + + meshlink_set_channel_sndbuf_storage(mesh_a, channel, sndbuf, buf_size); + meshlink_set_channel_rcvbuf_storage(mesh_a, channel, rcvbuf, buf_size); + + // Check that we can transition back and forth to external buffers + + meshlink_set_channel_sndbuf_storage(mesh_a, channel, NULL, 4096); + meshlink_set_channel_rcvbuf(mesh_a, channel, 4096); + + meshlink_set_channel_sndbuf_storage(mesh_a, channel, sndbuf, buf_size); + meshlink_set_channel_rcvbuf_storage(mesh_a, channel, rcvbuf, buf_size); + + // Wait for the channel to finish connecting + + meshlink_set_channel_poll_cb(mesh_a, channel, poll_cb); + assert(wait_sync_flag(&b_responded, 20)); + + // Send a lot of data + + char *outdata = malloc(size); + assert(outdata); + + for(size_t i = 0; i < size; i++) { + outdata[i] = i; + } + + char *indata = malloc(size); + assert(indata); + + assert(meshlink_channel_aio_receive(mesh_a, channel, indata, size, aio_cb, NULL)); + assert(meshlink_channel_aio_send(mesh_a, channel, outdata, size, NULL, NULL)); + assert(wait_sync_flag(&aio_finished, 20)); + assert(!memcmp(indata, outdata, size)); + + // Done + + meshlink_channel_close(mesh_a, channel); + + // Clean up. + + free(indata); + free(outdata); + free(rcvbuf); + free(sndbuf); + + close_meshlink_pair(mesh_a, mesh_b); +}