]> git.meshlink.io Git - meshlink/commitdiff
Add support for AIO using filedescriptors.
authorGuus Sliepen <guus@meshlink.io>
Thu, 29 Aug 2019 21:26:29 +0000 (23:26 +0200)
committerGuus Sliepen <guus@meshlink.io>
Thu, 29 Aug 2019 21:26:29 +0000 (23:26 +0200)
This adds support to enqueue transmits between channels and filedescriptors.
Currently, it requires that read() and write() calls on the filedescriptors
are non-blocking and always succeed, which limits it to reading from and
writing to files.

src/meshlink++.h
src/meshlink.c
src/meshlink.h
src/meshlink.sym
src/meshlink_internal.h
test/Makefile.am
test/channels-aio-fd.c [new file with mode: 0644]
test/channels-aio-fd.test [new file with mode: 0755]

index dee2c7060d04bc352b59901f911fc7d8a78a7009..7877aeca22becc6d0e32c67abf9e10b67cc99460 100644 (file)
@@ -110,6 +110,17 @@ typedef void (*channel_poll_cb_t)(mesh *mesh, channel *channel, size_t len);
  */
 typedef void (*aio_cb_t)(mesh *mesh, channel *channel, const void *data, size_t len, void *priv);
 
  */
 typedef void (*aio_cb_t)(mesh *mesh, channel *channel, const void *data, size_t len, void *priv);
 
+/// A callback for asynchronous I/O to and from filedescriptors.
+/** This callbacks signals that MeshLink has finished using this filedescriptor.
+ *
+ *  @param mesh      A handle which represents an instance of MeshLink.
+ *  @param channel   A handle for the channel which used this filedescriptor.
+ *  @param fd        The filedescriptor that was used.
+ *  @param len       The length of the data that was successfully sent or received.
+ *  @param priv      A private pointer which was set by the application when submitting the buffer.
+ */
+typedef void (*aio_fd_cb_t)(mesh *mesh, channel *channel, int fd, size_t len, void *priv);
+
 /// A class describing a MeshLink node.
 class node: public meshlink_node_t {
 };
 /// A class describing a MeshLink node.
 class node: public meshlink_node_t {
 };
@@ -776,6 +787,22 @@ public:
                return meshlink_channel_aio_send(handle, channel, data, len, cb, priv);
        }
 
                return meshlink_channel_aio_send(handle, channel, data, len, cb, priv);
        }
 
+       /// Transmit data on a channel asynchronously from a filedescriptor
+       /** This will read up to the specified length number of bytes from the given filedescriptor, and send it over the channel.
+        *  The callback may be returned early if there is an error reading from the filedescriptor.
+        *  While there is still with unsent data, the poll callback will not be called.
+        *
+        *  @param channel      A handle for the channel.
+        *  @param fd           A file descriptor from which data will be read.
+        *  @param len          The length of the data, or 0 if there is no data to send.
+        *  @param cb           A pointer to the function which will be called when MeshLink has finished using the filedescriptor.
+        *
+        *  @return             True if the buffer was enqueued, false otherwise.
+        */
+       bool channel_aio_fd_send(channel *channel, int fd, size_t len, meshlink_aio_fd_cb_t cb, void *priv) {
+               return meshlink_channel_aio_fd_send(handle, channel, fd, len, cb, priv);
+       }
+
        /// Receive data on a channel asynchronously
        /** This registers a buffer that will be filled with incoming channel data.
         *  Multiple buffers can be registered, in which case data will be received in the order the buffers were registered.
        /// Receive data on a channel asynchronously
        /** This registers a buffer that will be filled with incoming channel data.
         *  Multiple buffers can be registered, in which case data will be received in the order the buffers were registered.
@@ -794,6 +821,22 @@ public:
                return meshlink_channel_aio_receive(handle, channel, data, len, cb, priv);
        }
 
                return meshlink_channel_aio_receive(handle, channel, data, len, cb, priv);
        }
 
+       /// Receive data on a channel asynchronously and send it to a filedescriptor
+       /** This will read up to the specified length number of bytes from the channel, and send it to the filedescriptor.
+        *  The callback may be returned early if there is an error writing to the filedescriptor.
+        *  While there is still unread data, the receive callback will not be called.
+        *
+        *  @param channel      A handle for the channel.
+        *  @param fd           A file descriptor to which data will be written.
+        *  @param len          The length of the data.
+        *  @param cb           A pointer to the function which will be called when MeshLink has finished using the filedescriptor.
+        *
+        *  @return             True if the buffer was enqueued, false otherwise.
+        */
+       bool channel_aio_fd_receive(channel *channel, int fd, size_t len, meshlink_aio_fd_cb_t cb, void *priv) {
+               return meshlink_channel_aio_fd_receive(handle, channel, fd, len, cb, priv);
+       }
+
        /// Get the amount of bytes in the send buffer.
        /** This returns the amount of bytes in the send buffer.
         *  These bytes have not been received by the peer yet.
        /// Get the amount of bytes in the send buffer.
        /** This returns the amount of bytes in the send buffer.
         *  These bytes have not been received by the peer yet.
index 1d33a5897ad07155c8413794a7cbc3f77373d4c4..42573c8ffa6f68837fb1e06f3a567f0baefbdbd9 100644 (file)
@@ -2807,6 +2807,18 @@ static bool channel_pre_accept(struct utcp *utcp, uint16_t port) {
        return mesh->channel_accept_cb;
 }
 
        return mesh->channel_accept_cb;
 }
 
+static void aio_signal(meshlink_handle_t *mesh, meshlink_channel_t *channel, meshlink_aio_buffer_t *aio) {
+       if(aio->data) {
+               if(aio->cb.buffer) {
+                       aio->cb.buffer(mesh, channel, aio->data, aio->len, aio->priv);
+               }
+       } else {
+               if(aio->cb.fd) {
+                       aio->cb.fd(mesh, channel, aio->fd, aio->done, aio->priv);
+               }
+       }
+}
+
 static ssize_t channel_recv(struct utcp_connection *connection, const void *data, size_t len) {
        meshlink_channel_t *channel = connection->priv;
 
 static ssize_t channel_recv(struct utcp_connection *connection, const void *data, size_t len) {
        meshlink_channel_t *channel = connection->priv;
 
@@ -2833,16 +2845,21 @@ static ssize_t channel_recv(struct utcp_connection *connection, const void *data
                        todo = left;
                }
 
                        todo = left;
                }
 
-               memcpy((char *)aio->data + aio->done, p, todo);
+               if(aio->data) {
+                       memcpy((char *)aio->data + aio->done, p, todo);
+               } else {
+                       ssize_t result = write(aio->fd, p, todo);
+
+                       if(result > 0) {
+                               todo = result;
+                       }
+               }
+
                aio->done += todo;
 
                if(aio->done == aio->len) {
                        channel->aio_receive = aio->next;
                aio->done += todo;
 
                if(aio->done == aio->len) {
                        channel->aio_receive = aio->next;
-
-                       if(aio->cb) {
-                               aio->cb(mesh, channel, aio->data, aio->len, aio->priv);
-                       }
-
+                       aio_signal(mesh, channel, aio);
                        free(aio);
                }
 
                        free(aio);
                }
 
@@ -2930,12 +2947,34 @@ static void channel_poll(struct utcp_connection *connection, size_t len) {
        if(aio) {
                /* We at least one AIO buffer. Send as much as possible form the first buffer. */
                size_t left = aio->len - aio->done;
        if(aio) {
                /* We at least one AIO buffer. Send as much as possible form the first buffer. */
                size_t left = aio->len - aio->done;
+               ssize_t sent;
 
                if(len > left) {
                        len = left;
                }
 
 
                if(len > left) {
                        len = left;
                }
 
-               ssize_t sent = utcp_send(connection, (char *)aio->data + aio->done, len);
+               if(aio->data) {
+                       sent = utcp_send(connection, (char *)aio->data + aio->done, len);
+               } else {
+                       char buf[65536];
+                       size_t todo = utcp_get_sndbuf_free(connection);
+
+                       if(todo > left) {
+                               todo = left;
+                       }
+
+                       if(todo > sizeof(buf)) {
+                               todo = sizeof(buf);
+                       }
+
+                       ssize_t result = read(aio->fd, buf, todo);
+
+                       if(result > 0) {
+                               sent = utcp_send(connection, buf, result);
+                       } else {
+                               sent = result;
+                       }
+               }
 
                if(sent >= 0) {
                        aio->done += sent;
 
                if(sent >= 0) {
                        aio->done += sent;
@@ -2944,11 +2983,7 @@ static void channel_poll(struct utcp_connection *connection, size_t len) {
                /* If the buffer is now completely sent, call the callback and dispose of it. */
                if(aio->done >= aio->len) {
                        channel->aio_send = aio->next;
                /* If the buffer is now completely sent, call the callback and dispose of it. */
                if(aio->done >= aio->len) {
                        channel->aio_send = aio->next;
-
-                       if(aio->cb) {
-                               aio->cb(mesh, channel, aio->data, aio->len, aio->priv);
-                       }
-
+                       aio_signal(mesh, channel, aio);
                        free(aio);
                }
        } else {
                        free(aio);
                }
        } else {
@@ -3076,21 +3111,13 @@ void meshlink_channel_close(meshlink_handle_t *mesh, meshlink_channel_t *channel
        /* Clean up any outstanding AIO buffers. */
        for(meshlink_aio_buffer_t *aio = channel->aio_send, *next; aio; aio = next) {
                next = aio->next;
        /* Clean up any outstanding AIO buffers. */
        for(meshlink_aio_buffer_t *aio = channel->aio_send, *next; aio; aio = next) {
                next = aio->next;
-
-               if(aio->cb) {
-                       aio->cb(mesh, channel, aio->data, aio->len, aio->priv);
-               }
-
+               aio_signal(mesh, channel, aio);
                free(aio);
        }
 
        for(meshlink_aio_buffer_t *aio = channel->aio_receive, *next; aio; aio = next) {
                next = aio->next;
                free(aio);
        }
 
        for(meshlink_aio_buffer_t *aio = channel->aio_receive, *next; aio; aio = next) {
                next = aio->next;
-
-               if(aio->cb) {
-                       aio->cb(mesh, channel, aio->data, aio->len, aio->priv);
-               }
-
+               aio_signal(mesh, channel, aio);
                free(aio);
        }
 
                free(aio);
        }
 
@@ -3151,7 +3178,44 @@ bool meshlink_channel_aio_send(meshlink_handle_t *mesh, meshlink_channel_t *chan
        meshlink_aio_buffer_t *aio = xzalloc(sizeof(*aio));
        aio->data = data;
        aio->len = len;
        meshlink_aio_buffer_t *aio = xzalloc(sizeof(*aio));
        aio->data = data;
        aio->len = len;
-       aio->cb = cb;
+       aio->cb.buffer = cb;
+       aio->priv = priv;
+
+       pthread_mutex_lock(&mesh->mesh_mutex);
+
+       /* Append the AIO buffer descriptor to the end of the chain */
+       meshlink_aio_buffer_t **p = &channel->aio_send;
+
+       while(*p) {
+               p = &(*p)->next;
+       }
+
+       *p = aio;
+
+       /* Ensure the poll callback is set, and call it right now to push data if possible */
+       utcp_set_poll_cb(channel->c, channel_poll);
+       channel_poll(channel->c, len);
+
+       pthread_mutex_unlock(&mesh->mesh_mutex);
+
+       return true;
+}
+
+bool meshlink_channel_aio_fd_send(meshlink_handle_t *mesh, meshlink_channel_t *channel, int fd, size_t len, meshlink_aio_fd_cb_t cb, void *priv) {
+       if(!mesh || !channel) {
+               meshlink_errno = MESHLINK_EINVAL;
+               return false;
+       }
+
+       if(!len || fd == -1) {
+               meshlink_errno = MESHLINK_EINVAL;
+               return false;
+       }
+
+       meshlink_aio_buffer_t *aio = xzalloc(sizeof(*aio));
+       aio->fd = fd;
+       aio->len = len;
+       aio->cb.fd = cb;
        aio->priv = priv;
 
        pthread_mutex_lock(&mesh->mesh_mutex);
        aio->priv = priv;
 
        pthread_mutex_lock(&mesh->mesh_mutex);
@@ -3188,7 +3252,40 @@ bool meshlink_channel_aio_receive(meshlink_handle_t *mesh, meshlink_channel_t *c
        meshlink_aio_buffer_t *aio = xzalloc(sizeof(*aio));
        aio->data = data;
        aio->len = len;
        meshlink_aio_buffer_t *aio = xzalloc(sizeof(*aio));
        aio->data = data;
        aio->len = len;
-       aio->cb = cb;
+       aio->cb.buffer = cb;
+       aio->priv = priv;
+
+       pthread_mutex_lock(&mesh->mesh_mutex);
+
+       /* Append the AIO buffer descriptor to the end of the chain */
+       meshlink_aio_buffer_t **p = &channel->aio_receive;
+
+       while(*p) {
+               p = &(*p)->next;
+       }
+
+       *p = aio;
+
+       pthread_mutex_unlock(&mesh->mesh_mutex);
+
+       return true;
+}
+
+bool meshlink_channel_aio_fd_receive(meshlink_handle_t *mesh, meshlink_channel_t *channel, int fd, size_t len, meshlink_aio_fd_cb_t cb, void *priv) {
+       if(!mesh || !channel) {
+               meshlink_errno = MESHLINK_EINVAL;
+               return false;
+       }
+
+       if(!len || fd == -1) {
+               meshlink_errno = MESHLINK_EINVAL;
+               return false;
+       }
+
+       meshlink_aio_buffer_t *aio = xzalloc(sizeof(*aio));
+       aio->fd = fd;
+       aio->len = len;
+       aio->cb.fd = cb;
        aio->priv = priv;
 
        pthread_mutex_lock(&mesh->mesh_mutex);
        aio->priv = priv;
 
        pthread_mutex_lock(&mesh->mesh_mutex);
index 075d5f16fee33618fbfa130f3862841f826605fd..06f8791334d67ec8378aa45c15fec9ad4baac1be 100644 (file)
@@ -1156,10 +1156,20 @@ extern ssize_t meshlink_channel_send(meshlink_handle_t *mesh, meshlink_channel_t
  *  @param data      A pointer to a buffer containing the enqueued data.
  *  @param len       The length of the buffer.
  *  @param priv      A private pointer which was set by the application when submitting the buffer.
  *  @param data      A pointer to a buffer containing the enqueued data.
  *  @param len       The length of the buffer.
  *  @param priv      A private pointer which was set by the application when submitting the buffer.
-};
  */
 typedef void (*meshlink_aio_cb_t)(meshlink_handle_t *mesh, meshlink_channel_t *channel, const void *data, size_t len, void *priv);
 
  */
 typedef void (*meshlink_aio_cb_t)(meshlink_handle_t *mesh, meshlink_channel_t *channel, const void *data, size_t len, void *priv);
 
+/// A callback for asynchronous I/O to and from filedescriptors.
+/** This callbacks signals that MeshLink has finished using this filedescriptor.
+ *
+ *  @param mesh      A handle which represents an instance of MeshLink.
+ *  @param channel   A handle for the channel which used this filedescriptor.
+ *  @param fd        The filedescriptor that was used.
+ *  @param len       The length of the data that was successfully sent or received.
+ *  @param priv      A private pointer which was set by the application when submitting the buffer.
+ */
+typedef void (*meshlink_aio_fd_cb_t)(meshlink_handle_t *mesh, meshlink_channel_t *channel, int fd, size_t len, void *priv);
+
 /// Transmit data on a channel asynchronously
 /** This registers a buffer that will be used to send data to the remote node.
  *  Multiple buffers can be registered, in which case data will be sent in the order the buffers were registered.
 /// Transmit data on a channel asynchronously
 /** This registers a buffer that will be used to send data to the remote node.
  *  Multiple buffers can be registered, in which case data will be sent in the order the buffers were registered.
@@ -1177,6 +1187,21 @@ typedef void (*meshlink_aio_cb_t)(meshlink_handle_t *mesh, meshlink_channel_t *c
  */
 extern bool meshlink_channel_aio_send(meshlink_handle_t *mesh, meshlink_channel_t *channel, const void *data, size_t len, meshlink_aio_cb_t cb, void *priv);
 
  */
 extern bool meshlink_channel_aio_send(meshlink_handle_t *mesh, meshlink_channel_t *channel, const void *data, size_t len, meshlink_aio_cb_t cb, void *priv);
 
+/// Transmit data on a channel asynchronously from a filedescriptor
+/** This will read up to the specified length number of bytes from the given filedescriptor, and send it over the channel.
+ *  The callback may be returned early if there is an error reading from the filedescriptor.
+ *  While there is still with unsent data, the poll callback will not be called.
+ *
+ *  @param mesh         A handle which represents an instance of MeshLink.
+ *  @param channel      A handle for the channel.
+ *  @param fd           A file descriptor from which data will be read.
+ *  @param len          The length of the data, or 0 if there is no data to send.
+ *  @param cb           A pointer to the function which will be called when MeshLink has finished using the filedescriptor.
+ *
+ *  @return             True if the buffer was enqueued, false otherwise.
+ */
+extern bool meshlink_channel_aio_fd_send(meshlink_handle_t *mesh, meshlink_channel_t *channel, int fd, size_t len, meshlink_aio_fd_cb_t cb, void *priv);
+
 /// Receive data on a channel asynchronously
 /** This registers a buffer that will be filled with incoming channel data.
  *  Multiple buffers can be registered, in which case data will be received in the order the buffers were registered.
 /// Receive data on a channel asynchronously
 /** This registers a buffer that will be filled with incoming channel data.
  *  Multiple buffers can be registered, in which case data will be received in the order the buffers were registered.
@@ -1194,6 +1219,21 @@ extern bool meshlink_channel_aio_send(meshlink_handle_t *mesh, meshlink_channel_
  */
 extern bool meshlink_channel_aio_receive(meshlink_handle_t *mesh, meshlink_channel_t *channel, const void *data, size_t len, meshlink_aio_cb_t cb, void *priv);
 
  */
 extern bool meshlink_channel_aio_receive(meshlink_handle_t *mesh, meshlink_channel_t *channel, const void *data, size_t len, meshlink_aio_cb_t cb, void *priv);
 
+/// Receive data on a channel asynchronously and send it to a filedescriptor
+/** This will read up to the specified length number of bytes from the channel, and send it to the filedescriptor.
+ *  The callback may be returned early if there is an error writing to the filedescriptor.
+ *  While there is still unread data, the receive callback will not be called.
+ *
+ *  @param mesh         A handle which represents an instance of MeshLink.
+ *  @param channel      A handle for the channel.
+ *  @param fd           A file descriptor to which data will be written.
+ *  @param len          The length of the data.
+ *  @param cb           A pointer to the function which will be called when MeshLink has finished using the filedescriptor.
+ *
+ *  @return             True if the buffer was enqueued, false otherwise.
+ */
+extern bool meshlink_channel_aio_fd_receive(meshlink_handle_t *mesh, meshlink_channel_t *channel, int fd, size_t len, meshlink_aio_fd_cb_t cb, void *priv);
+
 /// Get channel flags.
 /** This returns the flags used when opening this channel.
  *
 /// Get channel flags.
 /** This returns the flags used when opening this channel.
  *
index ed2168da4e1a024ffb96f94e6b0dd8dead73c2cb..f6bce730018f2e6183c33731216e9c8684706fff 100644 (file)
@@ -9,6 +9,8 @@ devtool_trybind_probe
 meshlink_add_address
 meshlink_add_external_address
 meshlink_blacklist
 meshlink_add_address
 meshlink_add_external_address
 meshlink_blacklist
+meshlink_channel_aio_fd_receive
+meshlink_channel_aio_fd_send
 meshlink_channel_aio_receive
 meshlink_channel_aio_send
 meshlink_channel_close
 meshlink_channel_aio_receive
 meshlink_channel_aio_send
 meshlink_channel_close
index 68de758af73063d5b006c4d3fbc4d3b76458445a..569628cc3e6ebc05dca19f47d978c7e299c73961 100644 (file)
@@ -208,9 +208,13 @@ struct meshlink_submesh {
 /// An AIO buffer.
 typedef struct meshlink_aio_buffer {
        const void *data;
 /// An AIO buffer.
 typedef struct meshlink_aio_buffer {
        const void *data;
+       int fd;
        size_t len;
        size_t done;
        size_t len;
        size_t done;
-       meshlink_aio_cb_t cb;
+       union {
+               meshlink_aio_cb_t buffer;
+               meshlink_aio_fd_cb_t fd;
+       } cb;
        void *priv;
        struct meshlink_aio_buffer *next;
 } meshlink_aio_buffer_t;
        void *priv;
        struct meshlink_aio_buffer *next;
 } meshlink_aio_buffer_t;
index 77cee74a7c72ef1261e784cd115b1c4c51de79cc..f006d13253db075705748afbef51a7c27a28bc94 100644 (file)
@@ -3,6 +3,7 @@ TESTS = \
        basicpp.test \
        channels.test \
        channels-aio.test \
        basicpp.test \
        channels.test \
        channels-aio.test \
+       channels-aio-fd.test \
        channels-cornercases.test \
        channels-failure.test \
        channels-fork.test \
        channels-cornercases.test \
        channels-failure.test \
        channels-fork.test \
@@ -30,6 +31,7 @@ check_PROGRAMS = \
        basicpp \
        channels \
        channels-aio \
        basicpp \
        channels \
        channels-aio \
+       channels-aio-fd \
        channels-cornercases \
        channels-failure \
        channels-fork \
        channels-cornercases \
        channels-failure \
        channels-fork \
@@ -60,6 +62,9 @@ channels_LDADD = ../src/libmeshlink.la
 channels_aio_SOURCES = channels-aio.c utils.c utils.h
 channels_aio_LDADD = ../src/libmeshlink.la
 
 channels_aio_SOURCES = channels-aio.c utils.c utils.h
 channels_aio_LDADD = ../src/libmeshlink.la
 
+channels_aio_fd_SOURCES = channels-aio-fd.c utils.c utils.h
+channels_aio_fd_LDADD = ../src/libmeshlink.la
+
 channels_no_partial_SOURCES = channels-no-partial.c utils.c utils.h
 channels_no_partial_LDADD = ../src/libmeshlink.la
 
 channels_no_partial_SOURCES = channels-no-partial.c utils.c utils.h
 channels_no_partial_LDADD = ../src/libmeshlink.la
 
diff --git a/test/channels-aio-fd.c b/test/channels-aio-fd.c
new file mode 100644 (file)
index 0000000..c793db8
--- /dev/null
@@ -0,0 +1,204 @@
+#include <assert.h>
+#include <stdio.h>
+#include <unistd.h>
+#include <stdlib.h>
+#include <string.h>
+#include <sys/time.h>
+#include <limits.h>
+
+#include "meshlink.h"
+#include "utils.h"
+
+static const size_t size = 1024 * 1024; // size of data to transfer
+static const size_t nchannels = 4; // number of simultaneous channels
+
+struct aio_info {
+       int callbacks;
+       size_t size;
+       struct timeval tv;
+       struct sync_flag flag;
+};
+
+struct channel_info {
+       FILE *file;
+       struct aio_info aio_infos[2];
+};
+
+static void aio_fd_cb(meshlink_handle_t *mesh, meshlink_channel_t *channel, int fd, size_t len, void *priv) {
+       (void)mesh;
+       (void)channel;
+       (void)fd;
+       (void)len;
+
+       struct aio_info *info = priv;
+       gettimeofday(&info->tv, NULL);
+       info->callbacks++;
+       info->size += len;
+       set_sync_flag(&info->flag, true);
+}
+
+static bool reject_cb(meshlink_handle_t *mesh, meshlink_channel_t *channel, uint16_t port, const void *data, size_t len) {
+       (void)mesh;
+       (void)channel;
+       (void)port;
+       (void)data;
+       (void)len;
+
+       return false;
+}
+
+static bool accept_cb(meshlink_handle_t *mesh, meshlink_channel_t *channel, uint16_t port, const void *data, size_t len) {
+       assert(port && port <= nchannels);
+       assert(!data);
+       assert(!len);
+
+       struct channel_info *infos = mesh->priv;
+       struct channel_info *info = &infos[port - 1];
+
+       assert(meshlink_channel_aio_fd_receive(mesh, channel, fileno(info->file), size / 4, aio_fd_cb, &info->aio_infos[0]));
+       assert(meshlink_channel_aio_fd_receive(mesh, channel, fileno(info->file), size - size / 4, aio_fd_cb, &info->aio_infos[1]));
+
+       return true;
+}
+
+int main(int argc, char *argv[]) {
+       (void)argc;
+       (void)argv;
+
+       // Prepare file
+
+       char *outdata = malloc(size);
+
+       assert(outdata);
+
+       for(size_t i = 0; i < size; i++) {
+               // Human readable output
+               outdata[i] = i % 96 ? i % 96 + 32 : '\n';
+       }
+
+       FILE *file = fopen("channels_aio_fd.in", "w");
+       assert(file);
+       assert(fwrite(outdata, size, 1, file) == 1);
+       assert(fclose(file) == 0);
+
+       struct channel_info in_infos[nchannels];
+       struct channel_info out_infos[nchannels];
+
+       memset(in_infos, 0, sizeof(in_infos));
+       memset(out_infos, 0, sizeof(out_infos));
+
+       for(size_t i = 0; i < nchannels; i++) {
+               char filename[PATH_MAX];
+               snprintf(filename, sizeof(filename), "channels_aio_fd.out%d", (int)i);
+               in_infos[i].file = fopen(filename, "w");
+               assert(in_infos[i].file);
+               out_infos[i].file = fopen("channels_aio_fd.in", "r");
+               assert(out_infos[i].file);
+       }
+
+       // Open two new meshlink instance.
+
+       meshlink_destroy("channels_aio_fd_conf.1");
+       meshlink_destroy("channels_aio_fd_conf.2");
+
+       meshlink_handle_t *mesh1 = meshlink_open("channels_aio_fd_conf.1", "foo", "channels", DEV_CLASS_BACKBONE);
+       assert(mesh1);
+
+       meshlink_handle_t *mesh2 = meshlink_open("channels_aio_fd_conf.2", "bar", "channels", DEV_CLASS_BACKBONE);
+       assert(mesh2);
+
+       mesh2->priv = in_infos;
+
+       meshlink_enable_discovery(mesh1, false);
+       meshlink_enable_discovery(mesh2, false);
+
+       // Import and export both side's data
+
+       meshlink_add_address(mesh1, "localhost");
+
+       char *data = meshlink_export(mesh1);
+       assert(data);
+       assert(meshlink_import(mesh2, data));
+       free(data);
+
+       data = meshlink_export(mesh2);
+       assert(data);
+       assert(meshlink_import(mesh1, data));
+       free(data);
+
+       // Set the callbacks.
+
+       meshlink_set_channel_accept_cb(mesh1, reject_cb);
+       meshlink_set_channel_accept_cb(mesh2, accept_cb);
+
+       // Start both instances
+
+       assert(meshlink_start(mesh1));
+       assert(meshlink_start(mesh2));
+
+       // Open channels from foo to bar.
+
+       meshlink_node_t *bar = meshlink_get_node(mesh1, "bar");
+       assert(bar);
+
+       meshlink_channel_t *channels[nchannels];
+
+       for(size_t i = 0; i < nchannels; i++) {
+               channels[i] = meshlink_channel_open(mesh1, bar, i + 1, NULL, NULL, 0);
+               assert(channels[i]);
+       }
+
+       // Send a large buffer of data on each channel.
+
+       for(size_t i = 0; i < nchannels; i++) {
+               assert(meshlink_channel_aio_fd_send(mesh1, channels[i], fileno(out_infos[i].file), size / 3, aio_fd_cb, &out_infos[i].aio_infos[0]));
+               assert(meshlink_channel_aio_fd_send(mesh1, channels[i], fileno(out_infos[i].file), size - size / 3, aio_fd_cb, &out_infos[i].aio_infos[1]));
+       }
+
+       // Wait for everyone to finish.
+
+       for(size_t i = 0; i < nchannels; i++) {
+               assert(wait_sync_flag(&out_infos[i].aio_infos[0].flag, 10));
+               assert(wait_sync_flag(&out_infos[i].aio_infos[1].flag, 10));
+               assert(wait_sync_flag(&in_infos[i].aio_infos[0].flag, 10));
+               assert(wait_sync_flag(&in_infos[i].aio_infos[1].flag, 10));
+       }
+
+       // Check that everything is correct.
+
+       for(size_t i = 0; i < nchannels; i++) {
+               assert(fclose(in_infos[i].file) == 0);
+               assert(fclose(out_infos[i].file) == 0);
+
+               // One callback for each AIO buffer.
+               assert(out_infos[i].aio_infos[0].callbacks == 1);
+               assert(out_infos[i].aio_infos[1].callbacks == 1);
+               assert(in_infos[i].aio_infos[0].callbacks == 1);
+               assert(in_infos[i].aio_infos[1].callbacks == 1);
+
+               // Correct size sent and received.
+               assert(out_infos[i].aio_infos[0].size == size / 3);
+               assert(out_infos[i].aio_infos[1].size == size - size / 3);
+               assert(in_infos[i].aio_infos[0].size == size / 4);
+               assert(in_infos[i].aio_infos[1].size == size - size / 4);
+
+               // First batch of data should all be sent and received before the second batch
+               for(size_t j = 0; j < nchannels; j++) {
+                       assert(timercmp(&out_infos[i].aio_infos[0].tv, &out_infos[j].aio_infos[1].tv, <=));
+                       assert(timercmp(&in_infos[i].aio_infos[0].tv, &in_infos[j].aio_infos[1].tv, <=));
+               }
+
+               // Files should be identical
+               char command[PATH_MAX];
+               snprintf(command, sizeof(command), "cmp channels_aio_fd.in channels_aio_fd.out%d", (int)i);
+               assert(system(command) == 0);
+
+       }
+
+       // Clean up.
+
+       meshlink_close(mesh2);
+       meshlink_close(mesh1);
+
+       return 0;
+}
diff --git a/test/channels-aio-fd.test b/test/channels-aio-fd.test
new file mode 100755 (executable)
index 0000000..c754aa0
--- /dev/null
@@ -0,0 +1,4 @@
+#!/bin/sh
+
+rm -Rf channels_aio_fd_conf.*
+./channels-aio-fd