]> git.meshlink.io Git - meshlink/commitdiff
Allow the application to provide channel send/receive buffers.
authorGuus Sliepen <guus@meshlink.io>
Thu, 18 Feb 2021 19:57:23 +0000 (20:57 +0100)
committerGuus Sliepen <guus@meshlink.io>
Wed, 3 Mar 2021 11:36:31 +0000 (12:36 +0100)
This adds two new functions to allow the application to allocate send
and/or receive buffers for channels:

meshlink_set_channel_rcvbuf_storage(mesh, channel, buf, size)
meshlink_set_channel_sndbuf_storage(mesh, channel, buf, size)

src/meshlink++.h
src/meshlink.c
src/meshlink.h
src/meshlink.sym
src/utcp-test.c
src/utcp.c
src/utcp.h
src/utcp_priv.h
test/Makefile.am
test/channels-buffer-storage.c [new file with mode: 0644]

index b7ba5b9111c746eb81961d591e916b26f714fa37..51bc5e37a91792523bfeb8633b951886f7e4a1b9 100644 (file)
@@ -898,6 +898,30 @@ public:
                meshlink_set_channel_flags(handle, channel, flags);
        }
 
+       /// Set the send buffer storage of a channel.
+       /** This function provides MeshLink with a send buffer allocated by the application.
+       *
+       *  @param channel   A handle for the channel.
+       *  @param buf       A pointer to the start of the buffer.
+       *                   If a NULL pointer is given, MeshLink will use its own internal buffer again.
+       *  @param size      The size of the buffer.
+       */
+       void set_channel_sndbuf_storage(channel *channel, void *buf, size_t size) {
+               meshlink_set_channel_sndbuf_storage(handle, channel, buf, size);
+       }
+
+       /// Set the receive buffer storage of a channel.
+       /** This function provides MeshLink with a receive buffer allocated by the application.
+       *
+       *  @param channel   A handle for the channel.
+       *  @param buf       A pointer to the start of the buffer.
+       *                   If a NULL pointer is given, MeshLink will use its own internal buffer again.
+       *  @param size      The size of the buffer.
+       */
+       void set_channel_rcvbuf_storage(channel *channel, void *buf, size_t size) {
+               meshlink_set_channel_rcvbuf_storage(handle, channel, buf, size);
+       }
+
        /// Set the connection timeout used for channels to the given node.
        /** This sets the timeout after which unresponsive channels will be reported as closed.
         *  The timeout is set for all current and future channels to the given node.
index fd8e2f9889f646d66e51f322eab7343abe49371f..1540df37344ace42cc87c70cc07160401fd914e7 100644 (file)
@@ -4055,6 +4055,14 @@ void meshlink_set_channel_accept_cb(meshlink_handle_t *mesh, meshlink_channel_ac
 }
 
 void meshlink_set_channel_sndbuf(meshlink_handle_t *mesh, meshlink_channel_t *channel, size_t size) {
+       meshlink_set_channel_sndbuf_storage(mesh, channel, NULL, size);
+}
+
+void meshlink_set_channel_rcvbuf(meshlink_handle_t *mesh, meshlink_channel_t *channel, size_t size) {
+       meshlink_set_channel_rcvbuf_storage(mesh, channel, NULL, size);
+}
+
+void meshlink_set_channel_sndbuf_storage(meshlink_handle_t *mesh, meshlink_channel_t *channel, void *buf, size_t size) {
        if(!mesh || !channel) {
                meshlink_errno = MESHLINK_EINVAL;
                return;
@@ -4064,11 +4072,11 @@ void meshlink_set_channel_sndbuf(meshlink_handle_t *mesh, meshlink_channel_t *ch
                abort();
        }
 
-       utcp_set_sndbuf(channel->c, size);
+       utcp_set_sndbuf(channel->c, buf, size);
        pthread_mutex_unlock(&mesh->mutex);
 }
 
-void meshlink_set_channel_rcvbuf(meshlink_handle_t *mesh, meshlink_channel_t *channel, size_t size) {
+void meshlink_set_channel_rcvbuf_storage(meshlink_handle_t *mesh, meshlink_channel_t *channel, void *buf, size_t size) {
        if(!mesh || !channel) {
                meshlink_errno = MESHLINK_EINVAL;
                return;
@@ -4078,7 +4086,7 @@ void meshlink_set_channel_rcvbuf(meshlink_handle_t *mesh, meshlink_channel_t *ch
                abort();
        }
 
-       utcp_set_rcvbuf(channel->c, size);
+       utcp_set_rcvbuf(channel->c, buf, size);
        pthread_mutex_unlock(&mesh->mutex);
 }
 
index b3ccf8f0cf9b4ade4de87f28c15a36e2536d8cdd..822e314b68929a5186eb3cbbd046517b8576bb70 100644 (file)
@@ -1365,7 +1365,6 @@ void meshlink_set_channel_poll_cb(struct meshlink_handle *mesh, struct meshlink_
  *  @param mesh      A handle which represents an instance of MeshLink.
  *  @param channel   A handle for the channel.
  *  @param size      The desired size for the send buffer.
- *                   If a NULL pointer is given, the callback will be disabled.
  */
 void meshlink_set_channel_sndbuf(struct meshlink_handle *mesh, struct meshlink_channel *channel, size_t size);
 
@@ -1377,10 +1376,35 @@ void meshlink_set_channel_sndbuf(struct meshlink_handle *mesh, struct meshlink_c
  *  @param mesh      A handle which represents an instance of MeshLink.
  *  @param channel   A handle for the channel.
  *  @param size      The desired size for the send buffer.
- *                   If a NULL pointer is given, the callback will be disabled.
  */
 void meshlink_set_channel_rcvbuf(struct meshlink_handle *mesh, struct meshlink_channel *channel, size_t size);
 
+/// Set the send buffer storage of a channel.
+/** This function provides MeshLink with a send buffer allocated by the application.
+ *  The buffer must be valid until the channel is closed or until this function is called again with a NULL pointer for @a buf.
+ *
+ *  \memberof meshlink_channel
+ *  @param mesh      A handle which represents an instance of MeshLink.
+ *  @param channel   A handle for the channel.
+ *  @param buf       A pointer to the start of the buffer.
+ *                   If a NULL pointer is given, MeshLink will use its own internal buffer again.
+ *  @param size      The size of the buffer.
+ */
+void meshlink_set_channel_sndbuf_storage(struct meshlink_handle *mesh, struct meshlink_channel *channel, void *buf, size_t size);
+
+/// Set the receive buffer storage of a channel.
+/** This function provides MeshLink with a receive buffer allocated by the application.
+ *  The buffer must be valid until the channel is closed or until this function is called again with a NULL pointer for @a buf.
+ *
+ *  \memberof meshlink_channel
+ *  @param mesh      A handle which represents an instance of MeshLink.
+ *  @param channel   A handle for the channel.
+ *  @param buf       A pointer to the start of the buffer.
+ *                   If a NULL pointer is given, MeshLink will use its own internal buffer again.
+ *  @param size      The size of the buffer.
+ */
+void meshlink_set_channel_rcvbuf_storage(struct meshlink_handle *mesh, struct meshlink_channel *channel, void *buf, size_t size);
+
 /// Set the flags of a channel.
 /** This function allows changing some of the channel flags.
  *  Currently only MESHLINK_CHANNEL_NO_PARTIAL and MESHLINK_CHANNEL_DROP_LATE are supported, other flags are ignored.
index faa3517d0ca400169245df0051c288d4603d3ee9..6c29df36b877b60735f1f35267ac4519cfa9169d 100644 (file)
@@ -76,8 +76,10 @@ meshlink_set_channel_flags
 meshlink_set_channel_listen_cb
 meshlink_set_channel_poll_cb
 meshlink_set_channel_rcvbuf
+meshlink_set_channel_rcvbuf_storage
 meshlink_set_channel_receive_cb
 meshlink_set_channel_sndbuf
+meshlink_set_channel_sndbuf_storage
 meshlink_set_connection_try_cb
 meshlink_set_default_blacklist
 meshlink_set_dev_class_fast_retry_period
index 564bfba8618472fdcffe646d9dcb54efeb937ce4..a7c0534f5703530b3d3204bea19012be5a96c93f 100644 (file)
@@ -105,8 +105,8 @@ static void do_accept(struct utcp_connection *nc, uint16_t port) {
        c = nc;
 
        if(bufsize) {
-               utcp_set_sndbuf(c, bufsize);
-               utcp_set_rcvbuf(c, bufsize);
+               utcp_set_sndbuf(c, NULL, bufsize);
+               utcp_set_rcvbuf(c, NULL, bufsize);
        }
 
        utcp_set_accept_cb(c->utcp, NULL, NULL);
@@ -299,8 +299,8 @@ int main(int argc, char *argv[]) {
                c = utcp_connect_ex(u, 1, do_recv, NULL, flags);
 
                if(bufsize) {
-                       utcp_set_sndbuf(c, bufsize);
-                       utcp_set_rcvbuf(c, bufsize);
+                       utcp_set_sndbuf(c, NULL, bufsize);
+                       utcp_set_rcvbuf(c, NULL, bufsize);
                }
        }
 
index f237270f85c7173610c84b447ea314f617fd79dd..9e2229ad522fb48b2d5431b6658e82671422680e 100644 (file)
@@ -396,7 +396,10 @@ static bool buffer_set_size(struct buffer *buf, uint32_t minsize, uint32_t maxsi
 }
 
 static void buffer_exit(struct buffer *buf) {
-       free(buf->data);
+       if(!buf->external) {
+               free(buf->data);
+       }
+
        memset(buf, 0, sizeof(*buf));
 }
 
@@ -2315,16 +2318,72 @@ size_t utcp_get_sndbuf_free(struct utcp_connection *c) {
        }
 }
 
-void utcp_set_sndbuf(struct utcp_connection *c, size_t size) {
-       if(!c) {
-               return;
+static void buffer_transfer(struct buffer *buf, char *newdata, size_t newsize) {
+       if(buffer_wraps(buf)) {
+               // Old situation:
+               // [345......012]
+               // New situation:
+               // [012345......]
+               uint32_t tailsize = buf->size - buf->offset;
+               memcpy(newdata, buf->data + buf->offset, tailsize);
+               memcpy(newdata + tailsize, buf->data, buf->used - buf->offset);
+       } else {
+               // Old situation:
+               // [....012345..]
+               // New situation:
+               // [012345......]
+               memcpy(newdata, buf->data + buf->offset, buf->used);
+       }
+
+       buf->offset = 0;
+       buf->size = newsize;
+}
+
+static void set_buffer_storage(struct buffer *buf, char *data, size_t size) {
+       if(size > UINT32_MAX) {
+               size = UINT32_MAX;
        }
 
-       c->sndbuf.maxsize = size;
+       buf->maxsize = size;
 
-       if(c->sndbuf.maxsize != size) {
-               c->sndbuf.maxsize = -1;
+       if(data) {
+               if(buf->external) {
+                       // Don't allow resizing an external buffer
+                       abort();
+               }
+
+               if(size < buf->used) {
+                       // Ignore requests for an external buffer if we are already using more than it can store
+                       return;
+               }
+
+               // Transition from internal to external buffer
+               buffer_transfer(buf, data, size);
+               free(buf->data);
+               buf->data = data;
+               buf->external = true;
+       } else if(buf->external) {
+               // Transition from external to internal buf
+               size_t minsize = buf->used < DEFAULT_SNDBUFSIZE ? DEFAULT_SNDBUFSIZE : buf->used;
+               data = malloc(minsize);
+
+               if(!data) {
+                       // Cannot handle this
+                       abort();
+               }
+
+               buffer_transfer(buf, data, minsize);
+               buf->data = data;
+               buf->external = false;
        }
+}
+
+void utcp_set_sndbuf(struct utcp_connection *c, void *data, size_t size) {
+       if(!c) {
+               return;
+       }
+
+       set_buffer_storage(&c->sndbuf, data, size);
 
        c->do_poll = is_reliable(c) && buffer_free(&c->sndbuf);
 }
@@ -2341,16 +2400,12 @@ size_t utcp_get_rcvbuf_free(struct utcp_connection *c) {
        }
 }
 
-void utcp_set_rcvbuf(struct utcp_connection *c, size_t size) {
+void utcp_set_rcvbuf(struct utcp_connection *c, void *data, size_t size) {
        if(!c) {
                return;
        }
 
-       c->rcvbuf.maxsize = size;
-
-       if(c->rcvbuf.maxsize != size) {
-               c->rcvbuf.maxsize = -1;
-       }
+       set_buffer_storage(&c->rcvbuf, data, size);
 }
 
 size_t utcp_get_sendq(struct utcp_connection *c) {
index 9921a3755e252dce0bbceb25f986231f33e0d591..12964152420add6536efa500fcfd1d6ab709dc8d 100644 (file)
@@ -99,11 +99,11 @@ void utcp_set_retransmit_cb(struct utcp *utcp, utcp_retransmit_t retransmit);
 // Per-socket options
 
 size_t utcp_get_sndbuf(struct utcp_connection *connection);
-void utcp_set_sndbuf(struct utcp_connection *connection, size_t size);
+void utcp_set_sndbuf(struct utcp_connection *connection, void *buf, size_t size);
 size_t utcp_get_sndbuf_free(struct utcp_connection *connection);
 
 size_t utcp_get_rcvbuf(struct utcp_connection *connection);
-void utcp_set_rcvbuf(struct utcp_connection *connection, size_t size);
+void utcp_set_rcvbuf(struct utcp_connection *connection, void *buf, size_t size);
 size_t utcp_get_rcvbuf_free(struct utcp_connection *connection);
 
 size_t utcp_get_sendq(struct utcp_connection *connection);
index c95194589f6233e67e1bd7bd1dbf61409a60e259..78cc9f95cfebe7968d6aa75ee4011d540a1584ff 100644 (file)
@@ -95,6 +95,7 @@ struct buffer {
        uint32_t used;
        uint32_t size;
        uint32_t maxsize;
+       bool external;
 };
 
 struct sack {
index e398eeb3bf8a3a5208fc053f1f44dad3ce832533..4329f189a8b1b3fead4c3b82f5070bad08eae856 100644 (file)
@@ -6,6 +6,7 @@ TESTS = \
        channels-aio \
        channels-aio-cornercases \
        channels-aio-fd \
+       channels-buffer-storage \
        channels-cornercases \
        channels-failure \
        channels-fork \
@@ -46,6 +47,7 @@ check_PROGRAMS = \
        channels-aio \
        channels-aio-cornercases \
        channels-aio-fd \
+       channels-buffer-storage \
        channels-cornercases \
        channels-failure \
        channels-fork \
@@ -93,6 +95,9 @@ channels_aio_cornercases_LDADD = $(top_builddir)/src/libmeshlink.la
 channels_aio_fd_SOURCES = channels-aio-fd.c utils.c utils.h
 channels_aio_fd_LDADD = $(top_builddir)/src/libmeshlink.la
 
+channels_buffer_storage_SOURCES = channels-buffer-storage.c utils.c utils.h
+channels_buffer_storage_LDADD = $(top_builddir)/src/libmeshlink.la
+
 channels_no_partial_SOURCES = channels-no-partial.c utils.c utils.h
 channels_no_partial_LDADD = $(top_builddir)/src/libmeshlink.la
 
diff --git a/test/channels-buffer-storage.c b/test/channels-buffer-storage.c
new file mode 100644 (file)
index 0000000..af7b25b
--- /dev/null
@@ -0,0 +1,165 @@
+#ifdef NDEBUG
+#undef NDEBUG
+#endif
+
+#include <stdio.h>
+#include <unistd.h>
+#include <stdlib.h>
+#include <string.h>
+#include <sys/time.h>
+#include <assert.h>
+
+#include "utils.h"
+#include "../src/meshlink.h"
+
+static struct sync_flag b_responded;
+static struct sync_flag aio_finished;
+
+static const size_t size = 25000000; // size of data to transfer
+
+static void a_receive_cb(meshlink_handle_t *mesh, meshlink_channel_t *channel, const void *data, size_t len) {
+       (void)mesh;
+       (void)channel;
+
+       if(len == 5 && !memcmp(data, "Hello", 5)) {
+               set_sync_flag(&b_responded, true);
+       }
+}
+
+static void b_receive_cb(meshlink_handle_t *mesh, meshlink_channel_t *channel, const void *data, size_t len) {
+       assert(meshlink_channel_send(mesh, channel, data, len) == (ssize_t)len);
+}
+
+static bool reject_cb(meshlink_handle_t *mesh, meshlink_channel_t *channel, uint16_t port, const void *data, size_t len) {
+       (void)mesh;
+       (void)channel;
+       (void)port;
+       (void)data;
+       (void)len;
+
+       return false;
+}
+
+static bool accept_cb(meshlink_handle_t *mesh, meshlink_channel_t *channel, uint16_t port, const void *data, size_t len) {
+       printf("accept_cb: (from %s on port %u) ", channel->node->name, (unsigned int)port);
+
+       if(data) {
+               fwrite(data, 1, len, stdout);
+       }
+
+       printf("\n");
+
+       if(port != 7) {
+               return false;
+       }
+
+       meshlink_set_channel_receive_cb(mesh, channel, b_receive_cb);
+       meshlink_set_channel_sndbuf(mesh, channel, size);
+
+       if(data) {
+               b_receive_cb(mesh, channel, data, len);
+       }
+
+       return true;
+}
+
+static void poll_cb(meshlink_handle_t *mesh, meshlink_channel_t *channel, size_t len) {
+       (void)len;
+
+       meshlink_set_channel_poll_cb(mesh, channel, NULL);
+
+       assert(meshlink_channel_send(mesh, channel, "Hello", 5) == 5);
+}
+
+static void aio_cb(meshlink_handle_t *mesh, meshlink_channel_t *channel, const void *data, size_t len, void *priv) {
+       (void)mesh;
+       (void)channel;
+       (void)data;
+       (void)len;
+       (void)priv;
+
+       set_sync_flag(&aio_finished, true);
+}
+
+int main(void) {
+       init_sync_flag(&b_responded);
+       init_sync_flag(&aio_finished);
+
+       meshlink_set_log_cb(NULL, MESHLINK_INFO, log_cb);
+
+       // Open two new meshlink instance.
+
+       meshlink_handle_t *mesh_a, *mesh_b;
+       open_meshlink_pair(&mesh_a, &mesh_b, "channels-buffer-storage");
+
+       // Set the callbacks.
+
+       meshlink_set_channel_accept_cb(mesh_a, reject_cb);
+       meshlink_set_channel_accept_cb(mesh_b, accept_cb);
+
+       // Start both instances
+
+       start_meshlink_pair(mesh_a, mesh_b);
+
+       // Open a channel from a to b.
+
+       meshlink_node_t *b = meshlink_get_node(mesh_a, "b");
+       assert(b);
+
+       meshlink_channel_t *channel = meshlink_channel_open(mesh_a, b, 7, a_receive_cb, NULL, 0);
+       assert(channel);
+
+       size_t buf_size = 1024 * 1024;
+       char *sndbuf = malloc(1024 * 1024);
+       assert(sndbuf);
+       char *rcvbuf = malloc(1024 * 1024);
+       assert(rcvbuf);
+
+       // Set external buffers
+
+       meshlink_set_channel_sndbuf_storage(mesh_a, channel, sndbuf, buf_size);
+       meshlink_set_channel_rcvbuf_storage(mesh_a, channel, rcvbuf, buf_size);
+
+       // Check that we can transition back and forth to external buffers
+
+       meshlink_set_channel_sndbuf_storage(mesh_a, channel, NULL, 4096);
+       meshlink_set_channel_rcvbuf(mesh_a, channel, 4096);
+
+       meshlink_set_channel_sndbuf_storage(mesh_a, channel, sndbuf, buf_size);
+       meshlink_set_channel_rcvbuf_storage(mesh_a, channel, rcvbuf, buf_size);
+
+       // Wait for the channel to finish connecting
+
+       meshlink_set_channel_poll_cb(mesh_a, channel, poll_cb);
+       assert(wait_sync_flag(&b_responded, 20));
+
+       // Send a lot of data
+
+       char *outdata = malloc(size);
+       assert(outdata);
+
+       for(size_t i = 0; i < size; i++) {
+               outdata[i] = i;
+       }
+
+       char *indata = malloc(size);
+       assert(indata);
+
+       assert(meshlink_channel_aio_receive(mesh_a, channel, indata, size, aio_cb, NULL));
+       assert(meshlink_channel_aio_send(mesh_a, channel, outdata, size, NULL, NULL));
+       assert(wait_sync_flag(&aio_finished, 20));
+       assert(!memcmp(indata, outdata, size));
+
+       // Done
+
+       meshlink_channel_close(mesh_a, channel);
+
+       // Clean up.
+
+       free(indata);
+       free(outdata);
+       free(rcvbuf);
+       free(sndbuf);
+
+       close_meshlink_pair(mesh_a, mesh_b);
+}