From: Guus Sliepen Date: Thu, 29 Aug 2019 21:26:29 +0000 (+0200) Subject: Add support for AIO using filedescriptors. X-Git-Url: http://git.meshlink.io/?p=meshlink;a=commitdiff_plain;h=c023ad12147aa88810629c110ea6b1ab94267196 Add support for AIO using filedescriptors. This adds support to enqueue transmits between channels and filedescriptors. Currently, it requires that read() and write() calls on the filedescriptors are non-blocking and always succeed, which limits it to reading from and writing to files. --- diff --git a/src/meshlink++.h b/src/meshlink++.h index dee2c706..7877aeca 100644 --- a/src/meshlink++.h +++ b/src/meshlink++.h @@ -110,6 +110,17 @@ typedef void (*channel_poll_cb_t)(mesh *mesh, channel *channel, size_t len); */ typedef void (*aio_cb_t)(mesh *mesh, channel *channel, const void *data, size_t len, void *priv); +/// A callback for asynchronous I/O to and from filedescriptors. +/** This callbacks signals that MeshLink has finished using this filedescriptor. + * + * @param mesh A handle which represents an instance of MeshLink. + * @param channel A handle for the channel which used this filedescriptor. + * @param fd The filedescriptor that was used. + * @param len The length of the data that was successfully sent or received. + * @param priv A private pointer which was set by the application when submitting the buffer. + */ +typedef void (*aio_fd_cb_t)(mesh *mesh, channel *channel, int fd, size_t len, void *priv); + /// A class describing a MeshLink node. class node: public meshlink_node_t { }; @@ -776,6 +787,22 @@ public: return meshlink_channel_aio_send(handle, channel, data, len, cb, priv); } + /// Transmit data on a channel asynchronously from a filedescriptor + /** This will read up to the specified length number of bytes from the given filedescriptor, and send it over the channel. + * The callback may be returned early if there is an error reading from the filedescriptor. + * While there is still with unsent data, the poll callback will not be called. + * + * @param channel A handle for the channel. + * @param fd A file descriptor from which data will be read. + * @param len The length of the data, or 0 if there is no data to send. + * @param cb A pointer to the function which will be called when MeshLink has finished using the filedescriptor. + * + * @return True if the buffer was enqueued, false otherwise. + */ + bool channel_aio_fd_send(channel *channel, int fd, size_t len, meshlink_aio_fd_cb_t cb, void *priv) { + return meshlink_channel_aio_fd_send(handle, channel, fd, len, cb, priv); + } + /// Receive data on a channel asynchronously /** This registers a buffer that will be filled with incoming channel data. * Multiple buffers can be registered, in which case data will be received in the order the buffers were registered. @@ -794,6 +821,22 @@ public: return meshlink_channel_aio_receive(handle, channel, data, len, cb, priv); } + /// Receive data on a channel asynchronously and send it to a filedescriptor + /** This will read up to the specified length number of bytes from the channel, and send it to the filedescriptor. + * The callback may be returned early if there is an error writing to the filedescriptor. + * While there is still unread data, the receive callback will not be called. + * + * @param channel A handle for the channel. + * @param fd A file descriptor to which data will be written. + * @param len The length of the data. + * @param cb A pointer to the function which will be called when MeshLink has finished using the filedescriptor. + * + * @return True if the buffer was enqueued, false otherwise. + */ + bool channel_aio_fd_receive(channel *channel, int fd, size_t len, meshlink_aio_fd_cb_t cb, void *priv) { + return meshlink_channel_aio_fd_receive(handle, channel, fd, len, cb, priv); + } + /// Get the amount of bytes in the send buffer. /** This returns the amount of bytes in the send buffer. * These bytes have not been received by the peer yet. diff --git a/src/meshlink.c b/src/meshlink.c index 1d33a589..42573c8f 100644 --- a/src/meshlink.c +++ b/src/meshlink.c @@ -2807,6 +2807,18 @@ static bool channel_pre_accept(struct utcp *utcp, uint16_t port) { return mesh->channel_accept_cb; } +static void aio_signal(meshlink_handle_t *mesh, meshlink_channel_t *channel, meshlink_aio_buffer_t *aio) { + if(aio->data) { + if(aio->cb.buffer) { + aio->cb.buffer(mesh, channel, aio->data, aio->len, aio->priv); + } + } else { + if(aio->cb.fd) { + aio->cb.fd(mesh, channel, aio->fd, aio->done, aio->priv); + } + } +} + static ssize_t channel_recv(struct utcp_connection *connection, const void *data, size_t len) { meshlink_channel_t *channel = connection->priv; @@ -2833,16 +2845,21 @@ static ssize_t channel_recv(struct utcp_connection *connection, const void *data todo = left; } - memcpy((char *)aio->data + aio->done, p, todo); + if(aio->data) { + memcpy((char *)aio->data + aio->done, p, todo); + } else { + ssize_t result = write(aio->fd, p, todo); + + if(result > 0) { + todo = result; + } + } + aio->done += todo; if(aio->done == aio->len) { channel->aio_receive = aio->next; - - if(aio->cb) { - aio->cb(mesh, channel, aio->data, aio->len, aio->priv); - } - + aio_signal(mesh, channel, aio); free(aio); } @@ -2930,12 +2947,34 @@ static void channel_poll(struct utcp_connection *connection, size_t len) { if(aio) { /* We at least one AIO buffer. Send as much as possible form the first buffer. */ size_t left = aio->len - aio->done; + ssize_t sent; if(len > left) { len = left; } - ssize_t sent = utcp_send(connection, (char *)aio->data + aio->done, len); + if(aio->data) { + sent = utcp_send(connection, (char *)aio->data + aio->done, len); + } else { + char buf[65536]; + size_t todo = utcp_get_sndbuf_free(connection); + + if(todo > left) { + todo = left; + } + + if(todo > sizeof(buf)) { + todo = sizeof(buf); + } + + ssize_t result = read(aio->fd, buf, todo); + + if(result > 0) { + sent = utcp_send(connection, buf, result); + } else { + sent = result; + } + } if(sent >= 0) { aio->done += sent; @@ -2944,11 +2983,7 @@ static void channel_poll(struct utcp_connection *connection, size_t len) { /* If the buffer is now completely sent, call the callback and dispose of it. */ if(aio->done >= aio->len) { channel->aio_send = aio->next; - - if(aio->cb) { - aio->cb(mesh, channel, aio->data, aio->len, aio->priv); - } - + aio_signal(mesh, channel, aio); free(aio); } } else { @@ -3076,21 +3111,13 @@ void meshlink_channel_close(meshlink_handle_t *mesh, meshlink_channel_t *channel /* Clean up any outstanding AIO buffers. */ for(meshlink_aio_buffer_t *aio = channel->aio_send, *next; aio; aio = next) { next = aio->next; - - if(aio->cb) { - aio->cb(mesh, channel, aio->data, aio->len, aio->priv); - } - + aio_signal(mesh, channel, aio); free(aio); } for(meshlink_aio_buffer_t *aio = channel->aio_receive, *next; aio; aio = next) { next = aio->next; - - if(aio->cb) { - aio->cb(mesh, channel, aio->data, aio->len, aio->priv); - } - + aio_signal(mesh, channel, aio); free(aio); } @@ -3151,7 +3178,44 @@ bool meshlink_channel_aio_send(meshlink_handle_t *mesh, meshlink_channel_t *chan meshlink_aio_buffer_t *aio = xzalloc(sizeof(*aio)); aio->data = data; aio->len = len; - aio->cb = cb; + aio->cb.buffer = cb; + aio->priv = priv; + + pthread_mutex_lock(&mesh->mesh_mutex); + + /* Append the AIO buffer descriptor to the end of the chain */ + meshlink_aio_buffer_t **p = &channel->aio_send; + + while(*p) { + p = &(*p)->next; + } + + *p = aio; + + /* Ensure the poll callback is set, and call it right now to push data if possible */ + utcp_set_poll_cb(channel->c, channel_poll); + channel_poll(channel->c, len); + + pthread_mutex_unlock(&mesh->mesh_mutex); + + return true; +} + +bool meshlink_channel_aio_fd_send(meshlink_handle_t *mesh, meshlink_channel_t *channel, int fd, size_t len, meshlink_aio_fd_cb_t cb, void *priv) { + if(!mesh || !channel) { + meshlink_errno = MESHLINK_EINVAL; + return false; + } + + if(!len || fd == -1) { + meshlink_errno = MESHLINK_EINVAL; + return false; + } + + meshlink_aio_buffer_t *aio = xzalloc(sizeof(*aio)); + aio->fd = fd; + aio->len = len; + aio->cb.fd = cb; aio->priv = priv; pthread_mutex_lock(&mesh->mesh_mutex); @@ -3188,7 +3252,40 @@ bool meshlink_channel_aio_receive(meshlink_handle_t *mesh, meshlink_channel_t *c meshlink_aio_buffer_t *aio = xzalloc(sizeof(*aio)); aio->data = data; aio->len = len; - aio->cb = cb; + aio->cb.buffer = cb; + aio->priv = priv; + + pthread_mutex_lock(&mesh->mesh_mutex); + + /* Append the AIO buffer descriptor to the end of the chain */ + meshlink_aio_buffer_t **p = &channel->aio_receive; + + while(*p) { + p = &(*p)->next; + } + + *p = aio; + + pthread_mutex_unlock(&mesh->mesh_mutex); + + return true; +} + +bool meshlink_channel_aio_fd_receive(meshlink_handle_t *mesh, meshlink_channel_t *channel, int fd, size_t len, meshlink_aio_fd_cb_t cb, void *priv) { + if(!mesh || !channel) { + meshlink_errno = MESHLINK_EINVAL; + return false; + } + + if(!len || fd == -1) { + meshlink_errno = MESHLINK_EINVAL; + return false; + } + + meshlink_aio_buffer_t *aio = xzalloc(sizeof(*aio)); + aio->fd = fd; + aio->len = len; + aio->cb.fd = cb; aio->priv = priv; pthread_mutex_lock(&mesh->mesh_mutex); diff --git a/src/meshlink.h b/src/meshlink.h index 075d5f16..06f87913 100644 --- a/src/meshlink.h +++ b/src/meshlink.h @@ -1156,10 +1156,20 @@ extern ssize_t meshlink_channel_send(meshlink_handle_t *mesh, meshlink_channel_t * @param data A pointer to a buffer containing the enqueued data. * @param len The length of the buffer. * @param priv A private pointer which was set by the application when submitting the buffer. -}; */ typedef void (*meshlink_aio_cb_t)(meshlink_handle_t *mesh, meshlink_channel_t *channel, const void *data, size_t len, void *priv); +/// A callback for asynchronous I/O to and from filedescriptors. +/** This callbacks signals that MeshLink has finished using this filedescriptor. + * + * @param mesh A handle which represents an instance of MeshLink. + * @param channel A handle for the channel which used this filedescriptor. + * @param fd The filedescriptor that was used. + * @param len The length of the data that was successfully sent or received. + * @param priv A private pointer which was set by the application when submitting the buffer. + */ +typedef void (*meshlink_aio_fd_cb_t)(meshlink_handle_t *mesh, meshlink_channel_t *channel, int fd, size_t len, void *priv); + /// Transmit data on a channel asynchronously /** This registers a buffer that will be used to send data to the remote node. * Multiple buffers can be registered, in which case data will be sent in the order the buffers were registered. @@ -1177,6 +1187,21 @@ typedef void (*meshlink_aio_cb_t)(meshlink_handle_t *mesh, meshlink_channel_t *c */ extern bool meshlink_channel_aio_send(meshlink_handle_t *mesh, meshlink_channel_t *channel, const void *data, size_t len, meshlink_aio_cb_t cb, void *priv); +/// Transmit data on a channel asynchronously from a filedescriptor +/** This will read up to the specified length number of bytes from the given filedescriptor, and send it over the channel. + * The callback may be returned early if there is an error reading from the filedescriptor. + * While there is still with unsent data, the poll callback will not be called. + * + * @param mesh A handle which represents an instance of MeshLink. + * @param channel A handle for the channel. + * @param fd A file descriptor from which data will be read. + * @param len The length of the data, or 0 if there is no data to send. + * @param cb A pointer to the function which will be called when MeshLink has finished using the filedescriptor. + * + * @return True if the buffer was enqueued, false otherwise. + */ +extern bool meshlink_channel_aio_fd_send(meshlink_handle_t *mesh, meshlink_channel_t *channel, int fd, size_t len, meshlink_aio_fd_cb_t cb, void *priv); + /// Receive data on a channel asynchronously /** This registers a buffer that will be filled with incoming channel data. * Multiple buffers can be registered, in which case data will be received in the order the buffers were registered. @@ -1194,6 +1219,21 @@ extern bool meshlink_channel_aio_send(meshlink_handle_t *mesh, meshlink_channel_ */ extern bool meshlink_channel_aio_receive(meshlink_handle_t *mesh, meshlink_channel_t *channel, const void *data, size_t len, meshlink_aio_cb_t cb, void *priv); +/// Receive data on a channel asynchronously and send it to a filedescriptor +/** This will read up to the specified length number of bytes from the channel, and send it to the filedescriptor. + * The callback may be returned early if there is an error writing to the filedescriptor. + * While there is still unread data, the receive callback will not be called. + * + * @param mesh A handle which represents an instance of MeshLink. + * @param channel A handle for the channel. + * @param fd A file descriptor to which data will be written. + * @param len The length of the data. + * @param cb A pointer to the function which will be called when MeshLink has finished using the filedescriptor. + * + * @return True if the buffer was enqueued, false otherwise. + */ +extern bool meshlink_channel_aio_fd_receive(meshlink_handle_t *mesh, meshlink_channel_t *channel, int fd, size_t len, meshlink_aio_fd_cb_t cb, void *priv); + /// Get channel flags. /** This returns the flags used when opening this channel. * diff --git a/src/meshlink.sym b/src/meshlink.sym index ed2168da..f6bce730 100644 --- a/src/meshlink.sym +++ b/src/meshlink.sym @@ -9,6 +9,8 @@ devtool_trybind_probe meshlink_add_address meshlink_add_external_address meshlink_blacklist +meshlink_channel_aio_fd_receive +meshlink_channel_aio_fd_send meshlink_channel_aio_receive meshlink_channel_aio_send meshlink_channel_close diff --git a/src/meshlink_internal.h b/src/meshlink_internal.h index 68de758a..569628cc 100644 --- a/src/meshlink_internal.h +++ b/src/meshlink_internal.h @@ -208,9 +208,13 @@ struct meshlink_submesh { /// An AIO buffer. typedef struct meshlink_aio_buffer { const void *data; + int fd; size_t len; size_t done; - meshlink_aio_cb_t cb; + union { + meshlink_aio_cb_t buffer; + meshlink_aio_fd_cb_t fd; + } cb; void *priv; struct meshlink_aio_buffer *next; } meshlink_aio_buffer_t; diff --git a/test/Makefile.am b/test/Makefile.am index 77cee74a..f006d132 100644 --- a/test/Makefile.am +++ b/test/Makefile.am @@ -3,6 +3,7 @@ TESTS = \ basicpp.test \ channels.test \ channels-aio.test \ + channels-aio-fd.test \ channels-cornercases.test \ channels-failure.test \ channels-fork.test \ @@ -30,6 +31,7 @@ check_PROGRAMS = \ basicpp \ channels \ channels-aio \ + channels-aio-fd \ channels-cornercases \ channels-failure \ channels-fork \ @@ -60,6 +62,9 @@ channels_LDADD = ../src/libmeshlink.la channels_aio_SOURCES = channels-aio.c utils.c utils.h channels_aio_LDADD = ../src/libmeshlink.la +channels_aio_fd_SOURCES = channels-aio-fd.c utils.c utils.h +channels_aio_fd_LDADD = ../src/libmeshlink.la + channels_no_partial_SOURCES = channels-no-partial.c utils.c utils.h channels_no_partial_LDADD = ../src/libmeshlink.la diff --git a/test/channels-aio-fd.c b/test/channels-aio-fd.c new file mode 100644 index 00000000..c793db83 --- /dev/null +++ b/test/channels-aio-fd.c @@ -0,0 +1,204 @@ +#include +#include +#include +#include +#include +#include +#include + +#include "meshlink.h" +#include "utils.h" + +static const size_t size = 1024 * 1024; // size of data to transfer +static const size_t nchannels = 4; // number of simultaneous channels + +struct aio_info { + int callbacks; + size_t size; + struct timeval tv; + struct sync_flag flag; +}; + +struct channel_info { + FILE *file; + struct aio_info aio_infos[2]; +}; + +static void aio_fd_cb(meshlink_handle_t *mesh, meshlink_channel_t *channel, int fd, size_t len, void *priv) { + (void)mesh; + (void)channel; + (void)fd; + (void)len; + + struct aio_info *info = priv; + gettimeofday(&info->tv, NULL); + info->callbacks++; + info->size += len; + set_sync_flag(&info->flag, true); +} + +static bool reject_cb(meshlink_handle_t *mesh, meshlink_channel_t *channel, uint16_t port, const void *data, size_t len) { + (void)mesh; + (void)channel; + (void)port; + (void)data; + (void)len; + + return false; +} + +static bool accept_cb(meshlink_handle_t *mesh, meshlink_channel_t *channel, uint16_t port, const void *data, size_t len) { + assert(port && port <= nchannels); + assert(!data); + assert(!len); + + struct channel_info *infos = mesh->priv; + struct channel_info *info = &infos[port - 1]; + + assert(meshlink_channel_aio_fd_receive(mesh, channel, fileno(info->file), size / 4, aio_fd_cb, &info->aio_infos[0])); + assert(meshlink_channel_aio_fd_receive(mesh, channel, fileno(info->file), size - size / 4, aio_fd_cb, &info->aio_infos[1])); + + return true; +} + +int main(int argc, char *argv[]) { + (void)argc; + (void)argv; + + // Prepare file + + char *outdata = malloc(size); + + assert(outdata); + + for(size_t i = 0; i < size; i++) { + // Human readable output + outdata[i] = i % 96 ? i % 96 + 32 : '\n'; + } + + FILE *file = fopen("channels_aio_fd.in", "w"); + assert(file); + assert(fwrite(outdata, size, 1, file) == 1); + assert(fclose(file) == 0); + + struct channel_info in_infos[nchannels]; + struct channel_info out_infos[nchannels]; + + memset(in_infos, 0, sizeof(in_infos)); + memset(out_infos, 0, sizeof(out_infos)); + + for(size_t i = 0; i < nchannels; i++) { + char filename[PATH_MAX]; + snprintf(filename, sizeof(filename), "channels_aio_fd.out%d", (int)i); + in_infos[i].file = fopen(filename, "w"); + assert(in_infos[i].file); + out_infos[i].file = fopen("channels_aio_fd.in", "r"); + assert(out_infos[i].file); + } + + // Open two new meshlink instance. + + meshlink_destroy("channels_aio_fd_conf.1"); + meshlink_destroy("channels_aio_fd_conf.2"); + + meshlink_handle_t *mesh1 = meshlink_open("channels_aio_fd_conf.1", "foo", "channels", DEV_CLASS_BACKBONE); + assert(mesh1); + + meshlink_handle_t *mesh2 = meshlink_open("channels_aio_fd_conf.2", "bar", "channels", DEV_CLASS_BACKBONE); + assert(mesh2); + + mesh2->priv = in_infos; + + meshlink_enable_discovery(mesh1, false); + meshlink_enable_discovery(mesh2, false); + + // Import and export both side's data + + meshlink_add_address(mesh1, "localhost"); + + char *data = meshlink_export(mesh1); + assert(data); + assert(meshlink_import(mesh2, data)); + free(data); + + data = meshlink_export(mesh2); + assert(data); + assert(meshlink_import(mesh1, data)); + free(data); + + // Set the callbacks. + + meshlink_set_channel_accept_cb(mesh1, reject_cb); + meshlink_set_channel_accept_cb(mesh2, accept_cb); + + // Start both instances + + assert(meshlink_start(mesh1)); + assert(meshlink_start(mesh2)); + + // Open channels from foo to bar. + + meshlink_node_t *bar = meshlink_get_node(mesh1, "bar"); + assert(bar); + + meshlink_channel_t *channels[nchannels]; + + for(size_t i = 0; i < nchannels; i++) { + channels[i] = meshlink_channel_open(mesh1, bar, i + 1, NULL, NULL, 0); + assert(channels[i]); + } + + // Send a large buffer of data on each channel. + + for(size_t i = 0; i < nchannels; i++) { + assert(meshlink_channel_aio_fd_send(mesh1, channels[i], fileno(out_infos[i].file), size / 3, aio_fd_cb, &out_infos[i].aio_infos[0])); + assert(meshlink_channel_aio_fd_send(mesh1, channels[i], fileno(out_infos[i].file), size - size / 3, aio_fd_cb, &out_infos[i].aio_infos[1])); + } + + // Wait for everyone to finish. + + for(size_t i = 0; i < nchannels; i++) { + assert(wait_sync_flag(&out_infos[i].aio_infos[0].flag, 10)); + assert(wait_sync_flag(&out_infos[i].aio_infos[1].flag, 10)); + assert(wait_sync_flag(&in_infos[i].aio_infos[0].flag, 10)); + assert(wait_sync_flag(&in_infos[i].aio_infos[1].flag, 10)); + } + + // Check that everything is correct. + + for(size_t i = 0; i < nchannels; i++) { + assert(fclose(in_infos[i].file) == 0); + assert(fclose(out_infos[i].file) == 0); + + // One callback for each AIO buffer. + assert(out_infos[i].aio_infos[0].callbacks == 1); + assert(out_infos[i].aio_infos[1].callbacks == 1); + assert(in_infos[i].aio_infos[0].callbacks == 1); + assert(in_infos[i].aio_infos[1].callbacks == 1); + + // Correct size sent and received. + assert(out_infos[i].aio_infos[0].size == size / 3); + assert(out_infos[i].aio_infos[1].size == size - size / 3); + assert(in_infos[i].aio_infos[0].size == size / 4); + assert(in_infos[i].aio_infos[1].size == size - size / 4); + + // First batch of data should all be sent and received before the second batch + for(size_t j = 0; j < nchannels; j++) { + assert(timercmp(&out_infos[i].aio_infos[0].tv, &out_infos[j].aio_infos[1].tv, <=)); + assert(timercmp(&in_infos[i].aio_infos[0].tv, &in_infos[j].aio_infos[1].tv, <=)); + } + + // Files should be identical + char command[PATH_MAX]; + snprintf(command, sizeof(command), "cmp channels_aio_fd.in channels_aio_fd.out%d", (int)i); + assert(system(command) == 0); + + } + + // Clean up. + + meshlink_close(mesh2); + meshlink_close(mesh1); + + return 0; +} diff --git a/test/channels-aio-fd.test b/test/channels-aio-fd.test new file mode 100755 index 00000000..c754aa03 --- /dev/null +++ b/test/channels-aio-fd.test @@ -0,0 +1,4 @@ +#!/bin/sh + +rm -Rf channels_aio_fd_conf.* +./channels-aio-fd