From d5112a0a5e7036957f22c604e767e33c4b10b775 Mon Sep 17 00:00:00 2001 From: Guus Sliepen Date: Mon, 12 Aug 2019 13:43:01 +0200 Subject: [PATCH] Add meshlink_channel_aio_send(). This function allows handing over a large amount of data to MeshLink which will be sent without needing intervention from the application. A callback is called when MeshLink is done with the data, so the application can call any cleanup function it needs to call. --- src/meshlink++.h | 28 ++++++++++++ src/meshlink.c | 97 ++++++++++++++++++++++++++++++++++++++--- src/meshlink.h | 28 ++++++++++++ src/meshlink.sym | 1 + src/meshlink_internal.h | 11 +++++ 5 files changed, 160 insertions(+), 5 deletions(-) diff --git a/src/meshlink++.h b/src/meshlink++.h index 2dcac002..fbefcea8 100644 --- a/src/meshlink++.h +++ b/src/meshlink++.h @@ -98,6 +98,18 @@ typedef void (*channel_receive_cb_t)(mesh *mesh, channel *channel, const void *d */ typedef void (*channel_poll_cb_t)(mesh *mesh, channel *channel, size_t len); +/// A callback for cleaning up buffers submitted for asynchronous I/O. +/** This callbacks signals that MeshLink has finished using this buffer. + * The ownership of the buffer is now back into the application's hands. + * + * @param mesh A handle which represents an instance of MeshLink. + * @param channel A handle for the channel which used this buffer. + * @param data A pointer to a buffer containing the enqueued data. + * @param len The length of the buffer. + * @param priv A private pointer which was set by the application when submitting the buffer. + */ +typedef void (*aio_cb_t)(mesh *mesh, channel *channel, const void *data, size_t len, void *priv); + /// A class describing a MeshLink node. class node: public meshlink_node_t { }; @@ -722,6 +734,22 @@ public: return meshlink_channel_send(handle, channel, data, len); } + /// Transmit data on a channel asynchronously + /** This queues data to send to the remote node. + * + * @param channel A handle for the channel. + * @param data A pointer to a buffer containing data sent by the source, or NULL if there is no data to send. + * After meshlink_channel_aio_send() returns, the buffer may not be modified or freed by the application + * until the callback routine is called. + * @param len The length of the data, or 0 if there is no data to send. + * @param cb A pointer to the function which will be called when MeshLink has finished using the buffer. + * + * @return True if the buffer was enqueued, false otherwise. + */ + bool channel_aio_send(channel *channel, const void *data, size_t len, meshlink_aio_cb_t cb, void *priv) { + return meshlink_channel_aio_send(handle, channel, data, len, cb, priv); + } + /// Get the amount of bytes in the send buffer. /** This returns the amount of bytes in the send buffer. * These bytes have not been received by the peer yet. diff --git a/src/meshlink.c b/src/meshlink.c index 20dad9f4..27fe945c 100644 --- a/src/meshlink.c +++ b/src/meshlink.c @@ -1387,7 +1387,7 @@ static void *meshlink_main_loop(void *arg) { } #else - pthread_cond_signal(&mesh->cond); + pthread_cond_signal(&mesh->cond); return NULL; #endif // HAVE_SETNS } @@ -2890,16 +2890,45 @@ static void channel_poll(struct utcp_connection *connection, size_t len) { node_t *n = channel->node; meshlink_handle_t *mesh = n->mesh; + meshlink_aio_buffer_t *aio = channel->aio; + + if(aio) { + /* We at least one AIO buffer. Send as much as possible form the first buffer. */ + size_t left = aio->len - aio->done; + + if(len > left) { + len = left; + } + + ssize_t sent = utcp_send(connection, (char *)aio->data + aio->done, len); + + if(sent >= 0) { + aio->done += sent; + } + + /* If the buffer is now completely sent, call the callback and dispose of it. */ + if(aio->done >= aio->len) { + channel->aio = aio->next; + + if(aio->cb) { + aio->cb(mesh, channel, aio->data, aio->len, aio->priv); + } - if(channel->poll_cb) { - channel->poll_cb(mesh, channel, len); + free(aio); + } + } else { + if(channel->poll_cb) { + channel->poll_cb(mesh, channel, len); + } else { + utcp_set_poll_cb(connection, NULL); + } } } void meshlink_set_channel_poll_cb(meshlink_handle_t *mesh, meshlink_channel_t *channel, meshlink_channel_poll_cb_t cb) { (void)mesh; channel->poll_cb = cb; - utcp_set_poll_cb(channel->c, cb ? channel_poll : NULL); + utcp_set_poll_cb(channel->c, (cb || channel->aio) ? channel_poll : NULL); } void meshlink_set_channel_accept_cb(meshlink_handle_t *mesh, meshlink_channel_accept_cb_t cb) { @@ -2982,6 +3011,18 @@ void meshlink_channel_close(meshlink_handle_t *mesh, meshlink_channel_t *channel } utcp_close(channel->c); + + /* Clean up any outstanding AIO buffers. */ + for(meshlink_aio_buffer_t *aio = channel->aio, *next; aio; aio = next) { + next = aio->next; + + if(aio->cb) { + aio->cb(mesh, channel, aio->data, aio->len, aio->priv); + } + + free(aio); + } + free(channel); } @@ -3005,8 +3046,17 @@ ssize_t meshlink_channel_send(meshlink_handle_t *mesh, meshlink_channel_t *chann // Then, preferably only if there is room in the receiver window, // kick the meshlink thread to go send packets. + ssize_t retval; + pthread_mutex_lock(&mesh->mesh_mutex); - ssize_t retval = utcp_send(channel->c, data, len); + + /* Disallow direct calls to utcp_send() while we still have AIO active. */ + if(channel->aio) { + retval = 0; + } else { + retval = utcp_send(channel->c, data, len); + } + pthread_mutex_unlock(&mesh->mesh_mutex); if(retval < 0) { @@ -3016,6 +3066,43 @@ ssize_t meshlink_channel_send(meshlink_handle_t *mesh, meshlink_channel_t *chann return retval; } +bool meshlink_channel_aio_send(meshlink_handle_t *mesh, meshlink_channel_t *channel, const void *data, size_t len, meshlink_aio_cb_t cb, void *priv) { + if(!mesh || !channel) { + meshlink_errno = MESHLINK_EINVAL; + return false; + } + + if(!len || !data) { + meshlink_errno = MESHLINK_EINVAL; + return false; + } + + meshlink_aio_buffer_t *aio = xzalloc(sizeof(*aio)); + aio->data = data; + aio->len = len; + aio->cb = cb; + aio->priv = priv; + + pthread_mutex_lock(&mesh->mesh_mutex); + + /* Append the AIO buffer descriptor to the end of the chain */ + meshlink_aio_buffer_t **p = &channel->aio; + + while(*p) { + p = &(*p)->next; + } + + *p = aio; + + /* Ensure the poll callback is set, and call it right now to push data if possible */ + utcp_set_poll_cb(channel->c, channel_poll); + channel_poll(channel->c, len); + + pthread_mutex_unlock(&mesh->mesh_mutex); + + return true; +} + uint32_t meshlink_channel_get_flags(meshlink_handle_t *mesh, meshlink_channel_t *channel) { if(!mesh || !channel) { meshlink_errno = MESHLINK_EINVAL; diff --git a/src/meshlink.h b/src/meshlink.h index 40deb5f1..5c5196ab 100644 --- a/src/meshlink.h +++ b/src/meshlink.h @@ -1125,6 +1125,34 @@ extern void meshlink_channel_close(meshlink_handle_t *mesh, meshlink_channel_t * */ extern ssize_t meshlink_channel_send(meshlink_handle_t *mesh, meshlink_channel_t *channel, const void *data, size_t len); +/// A callback for cleaning up buffers submitted for asynchronous I/O. +/** This callbacks signals that MeshLink has finished using this buffer. + * The ownership of the buffer is now back into the application's hands. + * + * @param mesh A handle which represents an instance of MeshLink. + * @param channel A handle for the channel which used this buffer. + * @param data A pointer to a buffer containing the enqueued data. + * @param len The length of the buffer. + * @param priv A private pointer which was set by the application when submitting the buffer. +}; + */ +typedef void (*meshlink_aio_cb_t)(meshlink_handle_t *mesh, meshlink_channel_t *channel, const void *data, size_t len, void *priv); + +/// Transmit data on a channel asynchronously +/** This queues data to send to the remote node. + * + * @param mesh A handle which represents an instance of MeshLink. + * @param channel A handle for the channel. + * @param data A pointer to a buffer containing data sent by the source, or NULL if there is no data to send. + * After meshlink_channel_aio_send() returns, the buffer may not be modified or freed by the application + * until the callback routine is called. + * @param len The length of the data, or 0 if there is no data to send. + * @param cb A pointer to the function which will be called when MeshLink has finished using the buffer. + * + * @return True if the buffer was enqueued, false otherwise. + */ +extern bool meshlink_channel_aio_send(meshlink_handle_t *mesh, meshlink_channel_t *channel, const void *data, size_t len, meshlink_aio_cb_t cb, void *priv); + /// Get channel flags. /** This returns the flags used when opening this channel. * diff --git a/src/meshlink.sym b/src/meshlink.sym index 6bf27627..05fb66c2 100644 --- a/src/meshlink.sym +++ b/src/meshlink.sym @@ -9,6 +9,7 @@ devtool_trybind_probe meshlink_add_address meshlink_add_external_address meshlink_blacklist +meshlink_channel_aio_send meshlink_channel_close meshlink_channel_get_flags meshlink_channel_get_recvq diff --git a/src/meshlink_internal.h b/src/meshlink_internal.h index 7e2955eb..0e3581f5 100644 --- a/src/meshlink_internal.h +++ b/src/meshlink_internal.h @@ -205,12 +205,23 @@ struct meshlink_submesh { void *priv; }; +/// An AIO buffer. +typedef struct meshlink_aio_buffer { + const void *data; + size_t len; + size_t done; + meshlink_aio_cb_t cb; + void *priv; + struct meshlink_aio_buffer *next; +} meshlink_aio_buffer_t; + /// A channel. struct meshlink_channel { struct node_t *node; void *priv; struct utcp_connection *c; + meshlink_aio_buffer_t *aio; meshlink_channel_receive_cb_t receive_cb; meshlink_channel_poll_cb_t poll_cb; }; -- 2.39.5