]> git.meshlink.io Git - meshlink/commitdiff
Allow the application to provide channel send/receive buffers. feature/set-buffer-storage
authorGuus Sliepen <guus@meshlink.io>
Thu, 18 Feb 2021 19:57:23 +0000 (20:57 +0100)
committerGuus Sliepen <guus@meshlink.io>
Thu, 18 Feb 2021 19:58:10 +0000 (20:58 +0100)
This adds two new functions to allow the application to allocate send
and/or receive buffers for channels:

meshlink_set_channel_rcvbuf_storage(mesh, channel, buf, size)
meshlink_set_channel_sndbuf_storage(mesh, channel, buf, size)

src/meshlink++.h
src/meshlink.c
src/meshlink.h
src/meshlink.sym
src/utcp-test.c
src/utcp.c
src/utcp.h
src/utcp_priv.h
test/Makefile.am
test/channels-buffer-storage.c [new file with mode: 0644]

index 551913d4ba2a94ceb4df51852dd8595ce563a28d..39ba5512eecb4274b69306c7eab3b2a4757c51fd 100644 (file)
@@ -885,6 +885,30 @@ public:
                meshlink_set_channel_rcvbuf(handle, channel, size);
        }
 
+       /// Set the send buffer storage of a channel.
+       /** This function provides MeshLink with a send buffer allocated by the application.
+       *
+       *  @param channel   A handle for the channel.
+       *  @param buf       A pointer to the start of the buffer.
+       *                   If a NULL pointer is given, MeshLink will use its own internal buffer again.
+       *  @param size      The size of the buffer.
+       */
+       void set_channel_sndbuf_storage(channel *channel, void *buf, size_t size) {
+               meshlink_set_channel_sndbuf_storage(handle, channel, buf, size);
+       }
+
+       /// Set the receive buffer storage of a channel.
+       /** This function provides MeshLink with a receive buffer allocated by the application.
+       *
+       *  @param channel   A handle for the channel.
+       *  @param buf       A pointer to the start of the buffer.
+       *                   If a NULL pointer is given, MeshLink will use its own internal buffer again.
+       *  @param size      The size of the buffer.
+       */
+       void set_channel_rcvbuf_storage(channel *channel, void *buf, size_t size) {
+               meshlink_set_channel_rcvbuf_storage(handle, channel, buf, size);
+       }
+
        /// Set the connection timeout used for channels to the given node.
        /** This sets the timeout after which unresponsive channels will be reported as closed.
         *  The timeout is set for all current and future channels to the given node.
index 009839314cba71ca319455d657896c4d8bcd9cd1..1f1b3dfd8d9d5b5728e7a3cb49572bbcea894c41 100644 (file)
@@ -4052,8 +4052,14 @@ void meshlink_set_channel_accept_cb(meshlink_handle_t *mesh, meshlink_channel_ac
 }
 
 void meshlink_set_channel_sndbuf(meshlink_handle_t *mesh, meshlink_channel_t *channel, size_t size) {
-       (void)mesh;
+       meshlink_set_channel_sndbuf_storage(mesh, channel, NULL, size);
+}
+
+void meshlink_set_channel_rcvbuf(meshlink_handle_t *mesh, meshlink_channel_t *channel, size_t size) {
+       meshlink_set_channel_rcvbuf_storage(mesh, channel, NULL, size);
+}
 
+void meshlink_set_channel_sndbuf_storage(meshlink_handle_t *mesh, meshlink_channel_t *channel, void *buf, size_t size) {
        if(!channel) {
                meshlink_errno = MESHLINK_EINVAL;
                return;
@@ -4063,13 +4069,11 @@ void meshlink_set_channel_sndbuf(meshlink_handle_t *mesh, meshlink_channel_t *ch
                abort();
        }
 
-       utcp_set_sndbuf(channel->c, size);
+       utcp_set_sndbuf(channel->c, buf, size);
        pthread_mutex_unlock(&mesh->mutex);
 }
 
-void meshlink_set_channel_rcvbuf(meshlink_handle_t *mesh, meshlink_channel_t *channel, size_t size) {
-       (void)mesh;
-
+void meshlink_set_channel_rcvbuf_storage(meshlink_handle_t *mesh, meshlink_channel_t *channel, void *buf, size_t size) {
        if(!channel) {
                meshlink_errno = MESHLINK_EINVAL;
                return;
@@ -4079,7 +4083,7 @@ void meshlink_set_channel_rcvbuf(meshlink_handle_t *mesh, meshlink_channel_t *ch
                abort();
        }
 
-       utcp_set_rcvbuf(channel->c, size);
+       utcp_set_rcvbuf(channel->c, buf, size);
        pthread_mutex_unlock(&mesh->mutex);
 }
 
index 1acd99b0663bd021ba6fc70a4e0ecfcb48ff319a..4ca6a0c2418f7ef37f92e56307794ee9b30863c0 100644 (file)
@@ -1365,7 +1365,6 @@ void meshlink_set_channel_poll_cb(struct meshlink_handle *mesh, struct meshlink_
  *  @param mesh      A handle which represents an instance of MeshLink.
  *  @param channel   A handle for the channel.
  *  @param size      The desired size for the send buffer.
- *                   If a NULL pointer is given, the callback will be disabled.
  */
 void meshlink_set_channel_sndbuf(struct meshlink_handle *mesh, struct meshlink_channel *channel, size_t size);
 
@@ -1377,10 +1376,35 @@ void meshlink_set_channel_sndbuf(struct meshlink_handle *mesh, struct meshlink_c
  *  @param mesh      A handle which represents an instance of MeshLink.
  *  @param channel   A handle for the channel.
  *  @param size      The desired size for the send buffer.
- *                   If a NULL pointer is given, the callback will be disabled.
  */
 void meshlink_set_channel_rcvbuf(struct meshlink_handle *mesh, struct meshlink_channel *channel, size_t size);
 
+/// Set the send buffer storage of a channel.
+/** This function provides MeshLink with a send buffer allocated by the application.
+ *  The buffer must be valid until the channel is closed or until this function is called again with a NULL pointer for @a buf.
+ *
+ *  \memberof meshlink_channel
+ *  @param mesh      A handle which represents an instance of MeshLink.
+ *  @param channel   A handle for the channel.
+ *  @param buf       A pointer to the start of the buffer.
+ *                   If a NULL pointer is given, MeshLink will use its own internal buffer again.
+ *  @param size      The size of the buffer.
+ */
+void meshlink_set_channel_sndbuf_storage(struct meshlink_handle *mesh, struct meshlink_channel *channel, void *buf, size_t size);
+
+/// Set the receive buffer storage of a channel.
+/** This function provides MeshLink with a receive buffer allocated by the application.
+ *  The buffer must be valid until the channel is closed or until this function is called again with a NULL pointer for @a buf.
+ *
+ *  \memberof meshlink_channel
+ *  @param mesh      A handle which represents an instance of MeshLink.
+ *  @param channel   A handle for the channel.
+ *  @param buf       A pointer to the start of the buffer.
+ *                   If a NULL pointer is given, MeshLink will use its own internal buffer again.
+ *  @param size      The size of the buffer.
+ */
+void meshlink_set_channel_rcvbuf_storage(struct meshlink_handle *mesh, struct meshlink_channel *channel, void *buf, size_t size);
+
 /// Open a reliable stream channel to another node.
 /** This function is called whenever a remote node wants to open a channel to the local node.
  *  The application then has to decide whether to accept or reject this channel.
index 51511ee9e1d5ed9a1eb8680306ac09f386657160..c296d3dece86af1978357772ac39bc37ae96a78f 100644 (file)
@@ -75,8 +75,10 @@ meshlink_set_channel_accept_cb
 meshlink_set_channel_listen_cb
 meshlink_set_channel_poll_cb
 meshlink_set_channel_rcvbuf
+meshlink_set_channel_rcvbuf_storage
 meshlink_set_channel_receive_cb
 meshlink_set_channel_sndbuf
+meshlink_set_channel_sndbuf_storage
 meshlink_set_connection_try_cb
 meshlink_set_default_blacklist
 meshlink_set_dev_class_fast_retry_period
index 564bfba8618472fdcffe646d9dcb54efeb937ce4..a7c0534f5703530b3d3204bea19012be5a96c93f 100644 (file)
@@ -105,8 +105,8 @@ static void do_accept(struct utcp_connection *nc, uint16_t port) {
        c = nc;
 
        if(bufsize) {
-               utcp_set_sndbuf(c, bufsize);
-               utcp_set_rcvbuf(c, bufsize);
+               utcp_set_sndbuf(c, NULL, bufsize);
+               utcp_set_rcvbuf(c, NULL, bufsize);
        }
 
        utcp_set_accept_cb(c->utcp, NULL, NULL);
@@ -299,8 +299,8 @@ int main(int argc, char *argv[]) {
                c = utcp_connect_ex(u, 1, do_recv, NULL, flags);
 
                if(bufsize) {
-                       utcp_set_sndbuf(c, bufsize);
-                       utcp_set_rcvbuf(c, bufsize);
+                       utcp_set_sndbuf(c, NULL, bufsize);
+                       utcp_set_rcvbuf(c, NULL, bufsize);
                }
        }
 
index 20dd0aba049757ca30833779ec772c63ba685310..e4a6165956909f165a1f34244d1eafbfb0e27a6d 100644 (file)
@@ -396,7 +396,10 @@ static bool buffer_set_size(struct buffer *buf, uint32_t minsize, uint32_t maxsi
 }
 
 static void buffer_exit(struct buffer *buf) {
-       free(buf->data);
+       if(!buf->external) {
+               free(buf->data);
+       }
+
        memset(buf, 0, sizeof(*buf));
 }
 
@@ -2315,16 +2318,72 @@ size_t utcp_get_sndbuf_free(struct utcp_connection *c) {
        }
 }
 
-void utcp_set_sndbuf(struct utcp_connection *c, size_t size) {
-       if(!c) {
-               return;
+static void buffer_transfer(struct buffer *buf, char *newdata, size_t newsize) {
+       if(buffer_wraps(buf)) {
+               // Old situation:
+               // [345......012]
+               // New situation:
+               // [012345......]
+               uint32_t tailsize = buf->size - buf->offset;
+               memcpy(newdata, buf->data + buf->offset, tailsize);
+               memcpy(newdata + tailsize, buf->data, buf->used - buf->offset);
+       } else {
+               // Old situation:
+               // [....012345..]
+               // New situation:
+               // [012345......]
+               memcpy(newdata, buf->data + buf->offset, buf->used);
+       }
+
+       buf->offset = 0;
+       buf->size = newsize;
+}
+
+static void set_buffer_storage(struct buffer *buf, char *data, size_t size) {
+       if(size > UINT32_MAX) {
+               size = UINT32_MAX;
        }
 
-       c->sndbuf.maxsize = size;
+       buf->maxsize = size;
 
-       if(c->sndbuf.maxsize != size) {
-               c->sndbuf.maxsize = -1;
+       if(data) {
+               if(buf->external) {
+                       // Don't allow resizing an external buffer
+                       abort();
+               }
+
+               if(size < buf->used) {
+                       // Ignore requests for an external buffer if we are already using more than it can store
+                       return;
+               }
+
+               // Transition from internal to external buffer
+               buffer_transfer(buf, data, size);
+               free(buf->data);
+               buf->data = data;
+               buf->external = true;
+       } else if(buf->external) {
+               // Transition from external to internal buf
+               size_t minsize = buf->used < DEFAULT_SNDBUFSIZE ? DEFAULT_SNDBUFSIZE : buf->used;
+               data = malloc(minsize);
+
+               if(!data) {
+                       // Cannot handle this
+                       abort();
+               }
+
+               buffer_transfer(buf, data, minsize);
+               buf->data = data;
+               buf->external = false;
        }
+}
+
+void utcp_set_sndbuf(struct utcp_connection *c, void *data, size_t size) {
+       if(!c) {
+               return;
+       }
+
+       set_buffer_storage(&c->sndbuf, data, size);
 
        c->do_poll = is_reliable(c) && buffer_free(&c->sndbuf);
 }
@@ -2341,16 +2400,12 @@ size_t utcp_get_rcvbuf_free(struct utcp_connection *c) {
        }
 }
 
-void utcp_set_rcvbuf(struct utcp_connection *c, size_t size) {
+void utcp_set_rcvbuf(struct utcp_connection *c, void *data, size_t size) {
        if(!c) {
                return;
        }
 
-       c->rcvbuf.maxsize = size;
-
-       if(c->rcvbuf.maxsize != size) {
-               c->rcvbuf.maxsize = -1;
-       }
+       set_buffer_storage(&c->rcvbuf, data, size);
 }
 
 size_t utcp_get_sendq(struct utcp_connection *c) {
index 2c537014da503548ea0633527ec38b7750691f9b..5fa53152bfefed44543c6dcd1a28b91bb0edcdee 100644 (file)
@@ -98,11 +98,11 @@ void utcp_set_retransmit_cb(struct utcp *utcp, utcp_retransmit_t retransmit);
 // Per-socket options
 
 size_t utcp_get_sndbuf(struct utcp_connection *connection);
-void utcp_set_sndbuf(struct utcp_connection *connection, size_t size);
+void utcp_set_sndbuf(struct utcp_connection *connection, void *buf, size_t size);
 size_t utcp_get_sndbuf_free(struct utcp_connection *connection);
 
 size_t utcp_get_rcvbuf(struct utcp_connection *connection);
-void utcp_set_rcvbuf(struct utcp_connection *connection, size_t size);
+void utcp_set_rcvbuf(struct utcp_connection *connection, void *buf, size_t size);
 size_t utcp_get_rcvbuf_free(struct utcp_connection *connection);
 
 size_t utcp_get_sendq(struct utcp_connection *connection);
index c95194589f6233e67e1bd7bd1dbf61409a60e259..78cc9f95cfebe7968d6aa75ee4011d540a1584ff 100644 (file)
@@ -95,6 +95,7 @@ struct buffer {
        uint32_t used;
        uint32_t size;
        uint32_t maxsize;
+       bool external;
 };
 
 struct sack {
index e398eeb3bf8a3a5208fc053f1f44dad3ce832533..4329f189a8b1b3fead4c3b82f5070bad08eae856 100644 (file)
@@ -6,6 +6,7 @@ TESTS = \
        channels-aio \
        channels-aio-cornercases \
        channels-aio-fd \
+       channels-buffer-storage \
        channels-cornercases \
        channels-failure \
        channels-fork \
@@ -46,6 +47,7 @@ check_PROGRAMS = \
        channels-aio \
        channels-aio-cornercases \
        channels-aio-fd \
+       channels-buffer-storage \
        channels-cornercases \
        channels-failure \
        channels-fork \
@@ -93,6 +95,9 @@ channels_aio_cornercases_LDADD = $(top_builddir)/src/libmeshlink.la
 channels_aio_fd_SOURCES = channels-aio-fd.c utils.c utils.h
 channels_aio_fd_LDADD = $(top_builddir)/src/libmeshlink.la
 
+channels_buffer_storage_SOURCES = channels-buffer-storage.c utils.c utils.h
+channels_buffer_storage_LDADD = $(top_builddir)/src/libmeshlink.la
+
 channels_no_partial_SOURCES = channels-no-partial.c utils.c utils.h
 channels_no_partial_LDADD = $(top_builddir)/src/libmeshlink.la
 
diff --git a/test/channels-buffer-storage.c b/test/channels-buffer-storage.c
new file mode 100644 (file)
index 0000000..af7b25b
--- /dev/null
@@ -0,0 +1,165 @@
+#ifdef NDEBUG
+#undef NDEBUG
+#endif
+
+#include <stdio.h>
+#include <unistd.h>
+#include <stdlib.h>
+#include <string.h>
+#include <sys/time.h>
+#include <assert.h>
+
+#include "utils.h"
+#include "../src/meshlink.h"
+
+static struct sync_flag b_responded;
+static struct sync_flag aio_finished;
+
+static const size_t size = 25000000; // size of data to transfer
+
+static void a_receive_cb(meshlink_handle_t *mesh, meshlink_channel_t *channel, const void *data, size_t len) {
+       (void)mesh;
+       (void)channel;
+
+       if(len == 5 && !memcmp(data, "Hello", 5)) {
+               set_sync_flag(&b_responded, true);
+       }
+}
+
+static void b_receive_cb(meshlink_handle_t *mesh, meshlink_channel_t *channel, const void *data, size_t len) {
+       assert(meshlink_channel_send(mesh, channel, data, len) == (ssize_t)len);
+}
+
+static bool reject_cb(meshlink_handle_t *mesh, meshlink_channel_t *channel, uint16_t port, const void *data, size_t len) {
+       (void)mesh;
+       (void)channel;
+       (void)port;
+       (void)data;
+       (void)len;
+
+       return false;
+}
+
+static bool accept_cb(meshlink_handle_t *mesh, meshlink_channel_t *channel, uint16_t port, const void *data, size_t len) {
+       printf("accept_cb: (from %s on port %u) ", channel->node->name, (unsigned int)port);
+
+       if(data) {
+               fwrite(data, 1, len, stdout);
+       }
+
+       printf("\n");
+
+       if(port != 7) {
+               return false;
+       }
+
+       meshlink_set_channel_receive_cb(mesh, channel, b_receive_cb);
+       meshlink_set_channel_sndbuf(mesh, channel, size);
+
+       if(data) {
+               b_receive_cb(mesh, channel, data, len);
+       }
+
+       return true;
+}
+
+static void poll_cb(meshlink_handle_t *mesh, meshlink_channel_t *channel, size_t len) {
+       (void)len;
+
+       meshlink_set_channel_poll_cb(mesh, channel, NULL);
+
+       assert(meshlink_channel_send(mesh, channel, "Hello", 5) == 5);
+}
+
+static void aio_cb(meshlink_handle_t *mesh, meshlink_channel_t *channel, const void *data, size_t len, void *priv) {
+       (void)mesh;
+       (void)channel;
+       (void)data;
+       (void)len;
+       (void)priv;
+
+       set_sync_flag(&aio_finished, true);
+}
+
+int main(void) {
+       init_sync_flag(&b_responded);
+       init_sync_flag(&aio_finished);
+
+       meshlink_set_log_cb(NULL, MESHLINK_INFO, log_cb);
+
+       // Open two new meshlink instance.
+
+       meshlink_handle_t *mesh_a, *mesh_b;
+       open_meshlink_pair(&mesh_a, &mesh_b, "channels-buffer-storage");
+
+       // Set the callbacks.
+
+       meshlink_set_channel_accept_cb(mesh_a, reject_cb);
+       meshlink_set_channel_accept_cb(mesh_b, accept_cb);
+
+       // Start both instances
+
+       start_meshlink_pair(mesh_a, mesh_b);
+
+       // Open a channel from a to b.
+
+       meshlink_node_t *b = meshlink_get_node(mesh_a, "b");
+       assert(b);
+
+       meshlink_channel_t *channel = meshlink_channel_open(mesh_a, b, 7, a_receive_cb, NULL, 0);
+       assert(channel);
+
+       size_t buf_size = 1024 * 1024;
+       char *sndbuf = malloc(1024 * 1024);
+       assert(sndbuf);
+       char *rcvbuf = malloc(1024 * 1024);
+       assert(rcvbuf);
+
+       // Set external buffers
+
+       meshlink_set_channel_sndbuf_storage(mesh_a, channel, sndbuf, buf_size);
+       meshlink_set_channel_rcvbuf_storage(mesh_a, channel, rcvbuf, buf_size);
+
+       // Check that we can transition back and forth to external buffers
+
+       meshlink_set_channel_sndbuf_storage(mesh_a, channel, NULL, 4096);
+       meshlink_set_channel_rcvbuf(mesh_a, channel, 4096);
+
+       meshlink_set_channel_sndbuf_storage(mesh_a, channel, sndbuf, buf_size);
+       meshlink_set_channel_rcvbuf_storage(mesh_a, channel, rcvbuf, buf_size);
+
+       // Wait for the channel to finish connecting
+
+       meshlink_set_channel_poll_cb(mesh_a, channel, poll_cb);
+       assert(wait_sync_flag(&b_responded, 20));
+
+       // Send a lot of data
+
+       char *outdata = malloc(size);
+       assert(outdata);
+
+       for(size_t i = 0; i < size; i++) {
+               outdata[i] = i;
+       }
+
+       char *indata = malloc(size);
+       assert(indata);
+
+       assert(meshlink_channel_aio_receive(mesh_a, channel, indata, size, aio_cb, NULL));
+       assert(meshlink_channel_aio_send(mesh_a, channel, outdata, size, NULL, NULL));
+       assert(wait_sync_flag(&aio_finished, 20));
+       assert(!memcmp(indata, outdata, size));
+
+       // Done
+
+       meshlink_channel_close(mesh_a, channel);
+
+       // Clean up.
+
+       free(indata);
+       free(outdata);
+       free(rcvbuf);
+       free(sndbuf);
+
+       close_meshlink_pair(mesh_a, mesh_b);
+}