From 57114d942004e8a34ff22aadc0c620a0aabbb423 Mon Sep 17 00:00:00 2001 From: Guus Sliepen Date: Mon, 12 Aug 2019 16:46:02 +0200 Subject: [PATCH] Add meshlink_channel_aio_receive(). This function allows handing over a large buffer to MeshLink which will be used to receive data without needing intervention from the application. A callback is called when MeshLink has filled the buffer with the data. --- src/meshlink++.h | 22 +++++++++- src/meshlink.c | 94 +++++++++++++++++++++++++++++++++++++---- src/meshlink.h | 21 ++++++++- src/meshlink.sym | 1 + src/meshlink_internal.h | 3 +- test/channels-aio.c | 23 ++++++---- 6 files changed, 144 insertions(+), 20 deletions(-) diff --git a/src/meshlink++.h b/src/meshlink++.h index fbefcea8..3666f67d 100644 --- a/src/meshlink++.h +++ b/src/meshlink++.h @@ -735,7 +735,9 @@ public: } /// Transmit data on a channel asynchronously - /** This queues data to send to the remote node. + /** This registers a buffer that will be used to send data to the remote node. + * Multiple buffers can be registered, in which case data will be sent in the order the buffers were registered. + * While there are still buffers with unsent data, the poll callback will not be called. * * @param channel A handle for the channel. * @param data A pointer to a buffer containing data sent by the source, or NULL if there is no data to send. @@ -750,6 +752,24 @@ public: return meshlink_channel_aio_send(handle, channel, data, len, cb, priv); } + /// Receive data on a channel asynchronously + /** This registers a buffer that will be filled with incoming channel data. + * Multiple buffers can be registered, in which case data will be received in the order the buffers were registered. + * While there are still buffers that have not been filled, the receive callback will not be called. + * + * @param channel A handle for the channel. + * @param data A pointer to a buffer that will be filled with incoming data. + * After meshlink_channel_aio_receive() returns, the buffer may not be modified or freed by the application + * until the callback routine is called. + * @param len The length of the data. + * @param cb A pointer to the function which will be called when MeshLink has finished using the buffer. + * + * @return True if the buffer was enqueued, false otherwise. + */ + bool channel_aio_receive(channel *channel, const void *data, size_t len, meshlink_aio_cb_t cb, void *priv) { + return meshlink_channel_aio_receive(handle, channel, data, len, cb, priv); + } + /// Get the amount of bytes in the send buffer. /** This returns the amount of bytes in the send buffer. * These bytes have not been received by the peer yet. diff --git a/src/meshlink.c b/src/meshlink.c index 27fe945c..1b643503 100644 --- a/src/meshlink.c +++ b/src/meshlink.c @@ -2819,8 +2819,43 @@ static ssize_t channel_recv(struct utcp_connection *connection, const void *data if(n->status.destroyed) { meshlink_channel_close(mesh, channel); - } else if(channel->receive_cb) { - channel->receive_cb(mesh, channel, data, len); + return len; + } + + const char *p = data; + size_t left = len; + + while(channel->aio_receive) { + meshlink_aio_buffer_t *aio = channel->aio_receive; + size_t todo = aio->len - aio->done; + + if(todo > left) { + todo = left; + } + + memcpy((char *)aio->data + aio->done, p, todo); + aio->done += todo; + + if(aio->done == aio->len) { + channel->aio_receive = aio->next; + + if(aio->cb) { + aio->cb(mesh, channel, aio->data, aio->len, aio->priv); + } + + free(aio); + } + + p += todo; + left -= todo; + + if(!left && len) { + return len; + } + } + + if(channel->receive_cb) { + channel->receive_cb(mesh, channel, p, left); } return len; @@ -2890,7 +2925,7 @@ static void channel_poll(struct utcp_connection *connection, size_t len) { node_t *n = channel->node; meshlink_handle_t *mesh = n->mesh; - meshlink_aio_buffer_t *aio = channel->aio; + meshlink_aio_buffer_t *aio = channel->aio_send; if(aio) { /* We at least one AIO buffer. Send as much as possible form the first buffer. */ @@ -2908,7 +2943,7 @@ static void channel_poll(struct utcp_connection *connection, size_t len) { /* If the buffer is now completely sent, call the callback and dispose of it. */ if(aio->done >= aio->len) { - channel->aio = aio->next; + channel->aio_send = aio->next; if(aio->cb) { aio->cb(mesh, channel, aio->data, aio->len, aio->priv); @@ -2928,7 +2963,7 @@ static void channel_poll(struct utcp_connection *connection, size_t len) { void meshlink_set_channel_poll_cb(meshlink_handle_t *mesh, meshlink_channel_t *channel, meshlink_channel_poll_cb_t cb) { (void)mesh; channel->poll_cb = cb; - utcp_set_poll_cb(channel->c, (cb || channel->aio) ? channel_poll : NULL); + utcp_set_poll_cb(channel->c, (cb || channel->aio_send) ? channel_poll : NULL); } void meshlink_set_channel_accept_cb(meshlink_handle_t *mesh, meshlink_channel_accept_cb_t cb) { @@ -3013,7 +3048,17 @@ void meshlink_channel_close(meshlink_handle_t *mesh, meshlink_channel_t *channel utcp_close(channel->c); /* Clean up any outstanding AIO buffers. */ - for(meshlink_aio_buffer_t *aio = channel->aio, *next; aio; aio = next) { + for(meshlink_aio_buffer_t *aio = channel->aio_send, *next; aio; aio = next) { + next = aio->next; + + if(aio->cb) { + aio->cb(mesh, channel, aio->data, aio->len, aio->priv); + } + + free(aio); + } + + for(meshlink_aio_buffer_t *aio = channel->aio_receive, *next; aio; aio = next) { next = aio->next; if(aio->cb) { @@ -3051,7 +3096,7 @@ ssize_t meshlink_channel_send(meshlink_handle_t *mesh, meshlink_channel_t *chann pthread_mutex_lock(&mesh->mesh_mutex); /* Disallow direct calls to utcp_send() while we still have AIO active. */ - if(channel->aio) { + if(channel->aio_send) { retval = 0; } else { retval = utcp_send(channel->c, data, len); @@ -3086,7 +3131,7 @@ bool meshlink_channel_aio_send(meshlink_handle_t *mesh, meshlink_channel_t *chan pthread_mutex_lock(&mesh->mesh_mutex); /* Append the AIO buffer descriptor to the end of the chain */ - meshlink_aio_buffer_t **p = &channel->aio; + meshlink_aio_buffer_t **p = &channel->aio_send; while(*p) { p = &(*p)->next; @@ -3103,6 +3148,39 @@ bool meshlink_channel_aio_send(meshlink_handle_t *mesh, meshlink_channel_t *chan return true; } +bool meshlink_channel_aio_receive(meshlink_handle_t *mesh, meshlink_channel_t *channel, const void *data, size_t len, meshlink_aio_cb_t cb, void *priv) { + if(!mesh || !channel) { + meshlink_errno = MESHLINK_EINVAL; + return false; + } + + if(!len || !data) { + meshlink_errno = MESHLINK_EINVAL; + return false; + } + + meshlink_aio_buffer_t *aio = xzalloc(sizeof(*aio)); + aio->data = data; + aio->len = len; + aio->cb = cb; + aio->priv = priv; + + pthread_mutex_lock(&mesh->mesh_mutex); + + /* Append the AIO buffer descriptor to the end of the chain */ + meshlink_aio_buffer_t **p = &channel->aio_receive; + + while(*p) { + p = &(*p)->next; + } + + *p = aio; + + pthread_mutex_unlock(&mesh->mesh_mutex); + + return true; +} + uint32_t meshlink_channel_get_flags(meshlink_handle_t *mesh, meshlink_channel_t *channel) { if(!mesh || !channel) { meshlink_errno = MESHLINK_EINVAL; diff --git a/src/meshlink.h b/src/meshlink.h index 5c5196ab..31583e51 100644 --- a/src/meshlink.h +++ b/src/meshlink.h @@ -1139,7 +1139,9 @@ extern ssize_t meshlink_channel_send(meshlink_handle_t *mesh, meshlink_channel_t typedef void (*meshlink_aio_cb_t)(meshlink_handle_t *mesh, meshlink_channel_t *channel, const void *data, size_t len, void *priv); /// Transmit data on a channel asynchronously -/** This queues data to send to the remote node. +/** This registers a buffer that will be used to send data to the remote node. + * Multiple buffers can be registered, in which case data will be sent in the order the buffers were registered. + * While there are still buffers with unsent data, the poll callback will not be called. * * @param mesh A handle which represents an instance of MeshLink. * @param channel A handle for the channel. @@ -1153,6 +1155,23 @@ typedef void (*meshlink_aio_cb_t)(meshlink_handle_t *mesh, meshlink_channel_t *c */ extern bool meshlink_channel_aio_send(meshlink_handle_t *mesh, meshlink_channel_t *channel, const void *data, size_t len, meshlink_aio_cb_t cb, void *priv); +/// Receive data on a channel asynchronously +/** This registers a buffer that will be filled with incoming channel data. + * Multiple buffers can be registered, in which case data will be received in the order the buffers were registered. + * While there are still buffers that have not been filled, the receive callback will not be called. + * + * @param mesh A handle which represents an instance of MeshLink. + * @param channel A handle for the channel. + * @param data A pointer to a buffer that will be filled with incoming data. + * After meshlink_channel_aio_receive() returns, the buffer may not be modified or freed by the application + * until the callback routine is called. + * @param len The length of the data. + * @param cb A pointer to the function which will be called when MeshLink has finished using the buffer. + * + * @return True if the buffer was enqueued, false otherwise. + */ +extern bool meshlink_channel_aio_receive(meshlink_handle_t *mesh, meshlink_channel_t *channel, const void *data, size_t len, meshlink_aio_cb_t cb, void *priv); + /// Get channel flags. /** This returns the flags used when opening this channel. * diff --git a/src/meshlink.sym b/src/meshlink.sym index 05fb66c2..33cb5a1c 100644 --- a/src/meshlink.sym +++ b/src/meshlink.sym @@ -9,6 +9,7 @@ devtool_trybind_probe meshlink_add_address meshlink_add_external_address meshlink_blacklist +meshlink_channel_aio_receive meshlink_channel_aio_send meshlink_channel_close meshlink_channel_get_flags diff --git a/src/meshlink_internal.h b/src/meshlink_internal.h index 0e3581f5..68de758a 100644 --- a/src/meshlink_internal.h +++ b/src/meshlink_internal.h @@ -221,7 +221,8 @@ struct meshlink_channel { void *priv; struct utcp_connection *c; - meshlink_aio_buffer_t *aio; + meshlink_aio_buffer_t *aio_send; + meshlink_aio_buffer_t *aio_receive; meshlink_channel_receive_cb_t receive_cb; meshlink_channel_poll_cb_t poll_cb; }; diff --git a/test/channels-aio.c b/test/channels-aio.c index ca79cce9..ef4c0a3f 100644 --- a/test/channels-aio.c +++ b/test/channels-aio.c @@ -10,6 +10,7 @@ static const size_t size = 10000000; // size of data to transfer static bool bar_reachable = false; +static int bar_callbacks = 0; static int foo_callbacks = 0; static size_t bar_received = 0; static struct sync_flag bar_finished_flag; @@ -50,12 +51,14 @@ void foo_aio_cb(meshlink_handle_t *mesh, meshlink_channel_t *channel, const void foo_callbacks++; } -void bar_receive_cb(meshlink_handle_t *mesh, meshlink_channel_t *channel, const void *data, size_t len) { +void bar_aio_cb(meshlink_handle_t *mesh, meshlink_channel_t *channel, const void *data, size_t len, void *priv) { + (void)mesh; (void)channel; + (void)data; + (void)len; + (void)priv; - char *indata = mesh->priv; - memcpy(indata, data, len); - mesh->priv = indata + len; + bar_callbacks++; bar_received += len; if(bar_received >= size) { @@ -75,12 +78,13 @@ bool reject_cb(meshlink_handle_t *mesh, meshlink_channel_t *channel, uint16_t po bool accept_cb(meshlink_handle_t *mesh, meshlink_channel_t *channel, uint16_t port, const void *data, size_t len) { assert(port == 7); + assert(!data); + assert(!len); + char *outdata = mesh->priv; - meshlink_set_channel_receive_cb(mesh, channel, bar_receive_cb); - - if(data) { - bar_receive_cb(mesh, channel, data, len); - } + meshlink_set_channel_receive_cb(mesh, channel, NULL); + assert(meshlink_channel_aio_receive(mesh, channel, outdata, size / 4, bar_aio_cb, NULL)); + assert(meshlink_channel_aio_receive(mesh, channel, outdata + size / 4, size - size / 4, bar_aio_cb, NULL)); return true; } @@ -173,6 +177,7 @@ int main(int argc, char *argv[]) { assert(wait_sync_flag(&bar_finished_flag, 10)); assert(foo_callbacks == 2); + assert(bar_callbacks == 2); assert(bar_received == size); assert(!memcmp(indata, outdata, size)); -- 2.39.5