meshlink_set_channel_rcvbuf(handle, channel, size);
}
+ /// Set the send buffer storage of a channel.
+ /** This function provides MeshLink with a send buffer allocated by the application.
+ *
+ * @param channel A handle for the channel.
+ * @param buf A pointer to the start of the buffer.
+ * If a NULL pointer is given, MeshLink will use its own internal buffer again.
+ * @param size The size of the buffer.
+ */
+ void set_channel_sndbuf_storage(channel *channel, void *buf, size_t size) {
+ meshlink_set_channel_sndbuf_storage(handle, channel, buf, size);
+ }
+
+ /// Set the receive buffer storage of a channel.
+ /** This function provides MeshLink with a receive buffer allocated by the application.
+ *
+ * @param channel A handle for the channel.
+ * @param buf A pointer to the start of the buffer.
+ * If a NULL pointer is given, MeshLink will use its own internal buffer again.
+ * @param size The size of the buffer.
+ */
+ void set_channel_rcvbuf_storage(channel *channel, void *buf, size_t size) {
+ meshlink_set_channel_rcvbuf_storage(handle, channel, buf, size);
+ }
+
/// Set the connection timeout used for channels to the given node.
/** This sets the timeout after which unresponsive channels will be reported as closed.
* The timeout is set for all current and future channels to the given node.
}
void meshlink_set_channel_sndbuf(meshlink_handle_t *mesh, meshlink_channel_t *channel, size_t size) {
- (void)mesh;
+ meshlink_set_channel_sndbuf_storage(mesh, channel, NULL, size);
+}
+
+void meshlink_set_channel_rcvbuf(meshlink_handle_t *mesh, meshlink_channel_t *channel, size_t size) {
+ meshlink_set_channel_rcvbuf_storage(mesh, channel, NULL, size);
+}
+void meshlink_set_channel_sndbuf_storage(meshlink_handle_t *mesh, meshlink_channel_t *channel, void *buf, size_t size) {
if(!channel) {
meshlink_errno = MESHLINK_EINVAL;
return;
abort();
}
- utcp_set_sndbuf(channel->c, size);
+ utcp_set_sndbuf(channel->c, buf, size);
pthread_mutex_unlock(&mesh->mutex);
}
-void meshlink_set_channel_rcvbuf(meshlink_handle_t *mesh, meshlink_channel_t *channel, size_t size) {
- (void)mesh;
-
+void meshlink_set_channel_rcvbuf_storage(meshlink_handle_t *mesh, meshlink_channel_t *channel, void *buf, size_t size) {
if(!channel) {
meshlink_errno = MESHLINK_EINVAL;
return;
abort();
}
- utcp_set_rcvbuf(channel->c, size);
+ utcp_set_rcvbuf(channel->c, buf, size);
pthread_mutex_unlock(&mesh->mutex);
}
* @param mesh A handle which represents an instance of MeshLink.
* @param channel A handle for the channel.
* @param size The desired size for the send buffer.
- * If a NULL pointer is given, the callback will be disabled.
*/
void meshlink_set_channel_sndbuf(struct meshlink_handle *mesh, struct meshlink_channel *channel, size_t size);
* @param mesh A handle which represents an instance of MeshLink.
* @param channel A handle for the channel.
* @param size The desired size for the send buffer.
- * If a NULL pointer is given, the callback will be disabled.
*/
void meshlink_set_channel_rcvbuf(struct meshlink_handle *mesh, struct meshlink_channel *channel, size_t size);
+/// Set the send buffer storage of a channel.
+/** This function provides MeshLink with a send buffer allocated by the application.
+ * The buffer must be valid until the channel is closed or until this function is called again with a NULL pointer for @a buf.
+ *
+ * \memberof meshlink_channel
+ * @param mesh A handle which represents an instance of MeshLink.
+ * @param channel A handle for the channel.
+ * @param buf A pointer to the start of the buffer.
+ * If a NULL pointer is given, MeshLink will use its own internal buffer again.
+ * @param size The size of the buffer.
+ */
+void meshlink_set_channel_sndbuf_storage(struct meshlink_handle *mesh, struct meshlink_channel *channel, void *buf, size_t size);
+
+/// Set the receive buffer storage of a channel.
+/** This function provides MeshLink with a receive buffer allocated by the application.
+ * The buffer must be valid until the channel is closed or until this function is called again with a NULL pointer for @a buf.
+ *
+ * \memberof meshlink_channel
+ * @param mesh A handle which represents an instance of MeshLink.
+ * @param channel A handle for the channel.
+ * @param buf A pointer to the start of the buffer.
+ * If a NULL pointer is given, MeshLink will use its own internal buffer again.
+ * @param size The size of the buffer.
+ */
+void meshlink_set_channel_rcvbuf_storage(struct meshlink_handle *mesh, struct meshlink_channel *channel, void *buf, size_t size);
+
/// Open a reliable stream channel to another node.
/** This function is called whenever a remote node wants to open a channel to the local node.
* The application then has to decide whether to accept or reject this channel.
meshlink_set_channel_listen_cb
meshlink_set_channel_poll_cb
meshlink_set_channel_rcvbuf
+meshlink_set_channel_rcvbuf_storage
meshlink_set_channel_receive_cb
meshlink_set_channel_sndbuf
+meshlink_set_channel_sndbuf_storage
meshlink_set_connection_try_cb
meshlink_set_default_blacklist
meshlink_set_dev_class_fast_retry_period
c = nc;
if(bufsize) {
- utcp_set_sndbuf(c, bufsize);
- utcp_set_rcvbuf(c, bufsize);
+ utcp_set_sndbuf(c, NULL, bufsize);
+ utcp_set_rcvbuf(c, NULL, bufsize);
}
utcp_set_accept_cb(c->utcp, NULL, NULL);
c = utcp_connect_ex(u, 1, do_recv, NULL, flags);
if(bufsize) {
- utcp_set_sndbuf(c, bufsize);
- utcp_set_rcvbuf(c, bufsize);
+ utcp_set_sndbuf(c, NULL, bufsize);
+ utcp_set_rcvbuf(c, NULL, bufsize);
}
}
}
static void buffer_exit(struct buffer *buf) {
- free(buf->data);
+ if(!buf->external) {
+ free(buf->data);
+ }
+
memset(buf, 0, sizeof(*buf));
}
}
}
-void utcp_set_sndbuf(struct utcp_connection *c, size_t size) {
- if(!c) {
- return;
+static void buffer_transfer(struct buffer *buf, char *newdata, size_t newsize) {
+ if(buffer_wraps(buf)) {
+ // Old situation:
+ // [345......012]
+ // New situation:
+ // [012345......]
+ uint32_t tailsize = buf->size - buf->offset;
+ memcpy(newdata, buf->data + buf->offset, tailsize);
+ memcpy(newdata + tailsize, buf->data, buf->used - buf->offset);
+ } else {
+ // Old situation:
+ // [....012345..]
+ // New situation:
+ // [012345......]
+ memcpy(newdata, buf->data + buf->offset, buf->used);
+ }
+
+ buf->offset = 0;
+ buf->size = newsize;
+}
+
+static void set_buffer_storage(struct buffer *buf, char *data, size_t size) {
+ if(size > UINT32_MAX) {
+ size = UINT32_MAX;
}
- c->sndbuf.maxsize = size;
+ buf->maxsize = size;
- if(c->sndbuf.maxsize != size) {
- c->sndbuf.maxsize = -1;
+ if(data) {
+ if(buf->external) {
+ // Don't allow resizing an external buffer
+ abort();
+ }
+
+ if(size < buf->used) {
+ // Ignore requests for an external buffer if we are already using more than it can store
+ return;
+ }
+
+ // Transition from internal to external buffer
+ buffer_transfer(buf, data, size);
+ free(buf->data);
+ buf->data = data;
+ buf->external = true;
+ } else if(buf->external) {
+ // Transition from external to internal buf
+ size_t minsize = buf->used < DEFAULT_SNDBUFSIZE ? DEFAULT_SNDBUFSIZE : buf->used;
+ data = malloc(minsize);
+
+ if(!data) {
+ // Cannot handle this
+ abort();
+ }
+
+ buffer_transfer(buf, data, minsize);
+ buf->data = data;
+ buf->external = false;
}
+}
+
+void utcp_set_sndbuf(struct utcp_connection *c, void *data, size_t size) {
+ if(!c) {
+ return;
+ }
+
+ set_buffer_storage(&c->sndbuf, data, size);
c->do_poll = is_reliable(c) && buffer_free(&c->sndbuf);
}
}
}
-void utcp_set_rcvbuf(struct utcp_connection *c, size_t size) {
+void utcp_set_rcvbuf(struct utcp_connection *c, void *data, size_t size) {
if(!c) {
return;
}
- c->rcvbuf.maxsize = size;
-
- if(c->rcvbuf.maxsize != size) {
- c->rcvbuf.maxsize = -1;
- }
+ set_buffer_storage(&c->rcvbuf, data, size);
}
size_t utcp_get_sendq(struct utcp_connection *c) {
// Per-socket options
size_t utcp_get_sndbuf(struct utcp_connection *connection);
-void utcp_set_sndbuf(struct utcp_connection *connection, size_t size);
+void utcp_set_sndbuf(struct utcp_connection *connection, void *buf, size_t size);
size_t utcp_get_sndbuf_free(struct utcp_connection *connection);
size_t utcp_get_rcvbuf(struct utcp_connection *connection);
-void utcp_set_rcvbuf(struct utcp_connection *connection, size_t size);
+void utcp_set_rcvbuf(struct utcp_connection *connection, void *buf, size_t size);
size_t utcp_get_rcvbuf_free(struct utcp_connection *connection);
size_t utcp_get_sendq(struct utcp_connection *connection);
uint32_t used;
uint32_t size;
uint32_t maxsize;
+ bool external;
};
struct sack {
channels-aio \
channels-aio-cornercases \
channels-aio-fd \
+ channels-buffer-storage \
channels-cornercases \
channels-failure \
channels-fork \
channels-aio \
channels-aio-cornercases \
channels-aio-fd \
+ channels-buffer-storage \
channels-cornercases \
channels-failure \
channels-fork \
channels_aio_fd_SOURCES = channels-aio-fd.c utils.c utils.h
channels_aio_fd_LDADD = $(top_builddir)/src/libmeshlink.la
+channels_buffer_storage_SOURCES = channels-buffer-storage.c utils.c utils.h
+channels_buffer_storage_LDADD = $(top_builddir)/src/libmeshlink.la
+
channels_no_partial_SOURCES = channels-no-partial.c utils.c utils.h
channels_no_partial_LDADD = $(top_builddir)/src/libmeshlink.la
--- /dev/null
+#ifdef NDEBUG
+#undef NDEBUG
+#endif
+
+#include <stdio.h>
+#include <unistd.h>
+#include <stdlib.h>
+#include <string.h>
+#include <sys/time.h>
+#include <assert.h>
+
+#include "utils.h"
+#include "../src/meshlink.h"
+
+static struct sync_flag b_responded;
+static struct sync_flag aio_finished;
+
+static const size_t size = 25000000; // size of data to transfer
+
+static void a_receive_cb(meshlink_handle_t *mesh, meshlink_channel_t *channel, const void *data, size_t len) {
+ (void)mesh;
+ (void)channel;
+
+ if(len == 5 && !memcmp(data, "Hello", 5)) {
+ set_sync_flag(&b_responded, true);
+ }
+}
+
+static void b_receive_cb(meshlink_handle_t *mesh, meshlink_channel_t *channel, const void *data, size_t len) {
+ assert(meshlink_channel_send(mesh, channel, data, len) == (ssize_t)len);
+}
+
+static bool reject_cb(meshlink_handle_t *mesh, meshlink_channel_t *channel, uint16_t port, const void *data, size_t len) {
+ (void)mesh;
+ (void)channel;
+ (void)port;
+ (void)data;
+ (void)len;
+
+ return false;
+}
+
+static bool accept_cb(meshlink_handle_t *mesh, meshlink_channel_t *channel, uint16_t port, const void *data, size_t len) {
+ printf("accept_cb: (from %s on port %u) ", channel->node->name, (unsigned int)port);
+
+ if(data) {
+ fwrite(data, 1, len, stdout);
+ }
+
+ printf("\n");
+
+ if(port != 7) {
+ return false;
+ }
+
+ meshlink_set_channel_receive_cb(mesh, channel, b_receive_cb);
+ meshlink_set_channel_sndbuf(mesh, channel, size);
+
+ if(data) {
+ b_receive_cb(mesh, channel, data, len);
+ }
+
+ return true;
+}
+
+static void poll_cb(meshlink_handle_t *mesh, meshlink_channel_t *channel, size_t len) {
+ (void)len;
+
+ meshlink_set_channel_poll_cb(mesh, channel, NULL);
+
+ assert(meshlink_channel_send(mesh, channel, "Hello", 5) == 5);
+}
+
+static void aio_cb(meshlink_handle_t *mesh, meshlink_channel_t *channel, const void *data, size_t len, void *priv) {
+ (void)mesh;
+ (void)channel;
+ (void)data;
+ (void)len;
+ (void)priv;
+
+ set_sync_flag(&aio_finished, true);
+}
+
+int main(void) {
+ init_sync_flag(&b_responded);
+ init_sync_flag(&aio_finished);
+
+ meshlink_set_log_cb(NULL, MESHLINK_INFO, log_cb);
+
+ // Open two new meshlink instance.
+
+ meshlink_handle_t *mesh_a, *mesh_b;
+ open_meshlink_pair(&mesh_a, &mesh_b, "channels-buffer-storage");
+
+ // Set the callbacks.
+
+ meshlink_set_channel_accept_cb(mesh_a, reject_cb);
+ meshlink_set_channel_accept_cb(mesh_b, accept_cb);
+
+ // Start both instances
+
+ start_meshlink_pair(mesh_a, mesh_b);
+
+ // Open a channel from a to b.
+
+ meshlink_node_t *b = meshlink_get_node(mesh_a, "b");
+ assert(b);
+
+ meshlink_channel_t *channel = meshlink_channel_open(mesh_a, b, 7, a_receive_cb, NULL, 0);
+ assert(channel);
+
+ size_t buf_size = 1024 * 1024;
+ char *sndbuf = malloc(1024 * 1024);
+ assert(sndbuf);
+ char *rcvbuf = malloc(1024 * 1024);
+ assert(rcvbuf);
+
+ // Set external buffers
+
+ meshlink_set_channel_sndbuf_storage(mesh_a, channel, sndbuf, buf_size);
+ meshlink_set_channel_rcvbuf_storage(mesh_a, channel, rcvbuf, buf_size);
+
+ // Check that we can transition back and forth to external buffers
+
+ meshlink_set_channel_sndbuf_storage(mesh_a, channel, NULL, 4096);
+ meshlink_set_channel_rcvbuf(mesh_a, channel, 4096);
+
+ meshlink_set_channel_sndbuf_storage(mesh_a, channel, sndbuf, buf_size);
+ meshlink_set_channel_rcvbuf_storage(mesh_a, channel, rcvbuf, buf_size);
+
+ // Wait for the channel to finish connecting
+
+ meshlink_set_channel_poll_cb(mesh_a, channel, poll_cb);
+ assert(wait_sync_flag(&b_responded, 20));
+
+ // Send a lot of data
+
+ char *outdata = malloc(size);
+ assert(outdata);
+
+ for(size_t i = 0; i < size; i++) {
+ outdata[i] = i;
+ }
+
+ char *indata = malloc(size);
+ assert(indata);
+
+ assert(meshlink_channel_aio_receive(mesh_a, channel, indata, size, aio_cb, NULL));
+ assert(meshlink_channel_aio_send(mesh_a, channel, outdata, size, NULL, NULL));
+ assert(wait_sync_flag(&aio_finished, 20));
+ assert(!memcmp(indata, outdata, size));
+
+ // Done
+
+ meshlink_channel_close(mesh_a, channel);
+
+ // Clean up.
+
+ free(indata);
+ free(outdata);
+ free(rcvbuf);
+ free(sndbuf);
+
+ close_meshlink_pair(mesh_a, mesh_b);
+}