This adds support to enqueue transmits between channels and filedescriptors.
Currently, it requires that read() and write() calls on the filedescriptors
are non-blocking and always succeed, which limits it to reading from and
writing to files.
*/
typedef void (*aio_cb_t)(mesh *mesh, channel *channel, const void *data, size_t len, void *priv);
+/// A callback for asynchronous I/O to and from filedescriptors.
+/** This callbacks signals that MeshLink has finished using this filedescriptor.
+ *
+ * @param mesh A handle which represents an instance of MeshLink.
+ * @param channel A handle for the channel which used this filedescriptor.
+ * @param fd The filedescriptor that was used.
+ * @param len The length of the data that was successfully sent or received.
+ * @param priv A private pointer which was set by the application when submitting the buffer.
+ */
+typedef void (*aio_fd_cb_t)(mesh *mesh, channel *channel, int fd, size_t len, void *priv);
+
/// A class describing a MeshLink node.
class node: public meshlink_node_t {
};
return meshlink_channel_aio_send(handle, channel, data, len, cb, priv);
}
+ /// Transmit data on a channel asynchronously from a filedescriptor
+ /** This will read up to the specified length number of bytes from the given filedescriptor, and send it over the channel.
+ * The callback may be returned early if there is an error reading from the filedescriptor.
+ * While there is still with unsent data, the poll callback will not be called.
+ *
+ * @param channel A handle for the channel.
+ * @param fd A file descriptor from which data will be read.
+ * @param len The length of the data, or 0 if there is no data to send.
+ * @param cb A pointer to the function which will be called when MeshLink has finished using the filedescriptor.
+ *
+ * @return True if the buffer was enqueued, false otherwise.
+ */
+ bool channel_aio_fd_send(channel *channel, int fd, size_t len, meshlink_aio_fd_cb_t cb, void *priv) {
+ return meshlink_channel_aio_fd_send(handle, channel, fd, len, cb, priv);
+ }
+
/// Receive data on a channel asynchronously
/** This registers a buffer that will be filled with incoming channel data.
* Multiple buffers can be registered, in which case data will be received in the order the buffers were registered.
return meshlink_channel_aio_receive(handle, channel, data, len, cb, priv);
}
+ /// Receive data on a channel asynchronously and send it to a filedescriptor
+ /** This will read up to the specified length number of bytes from the channel, and send it to the filedescriptor.
+ * The callback may be returned early if there is an error writing to the filedescriptor.
+ * While there is still unread data, the receive callback will not be called.
+ *
+ * @param channel A handle for the channel.
+ * @param fd A file descriptor to which data will be written.
+ * @param len The length of the data.
+ * @param cb A pointer to the function which will be called when MeshLink has finished using the filedescriptor.
+ *
+ * @return True if the buffer was enqueued, false otherwise.
+ */
+ bool channel_aio_fd_receive(channel *channel, int fd, size_t len, meshlink_aio_fd_cb_t cb, void *priv) {
+ return meshlink_channel_aio_fd_receive(handle, channel, fd, len, cb, priv);
+ }
+
/// Get the amount of bytes in the send buffer.
/** This returns the amount of bytes in the send buffer.
* These bytes have not been received by the peer yet.
return mesh->channel_accept_cb;
}
+static void aio_signal(meshlink_handle_t *mesh, meshlink_channel_t *channel, meshlink_aio_buffer_t *aio) {
+ if(aio->data) {
+ if(aio->cb.buffer) {
+ aio->cb.buffer(mesh, channel, aio->data, aio->len, aio->priv);
+ }
+ } else {
+ if(aio->cb.fd) {
+ aio->cb.fd(mesh, channel, aio->fd, aio->done, aio->priv);
+ }
+ }
+}
+
static ssize_t channel_recv(struct utcp_connection *connection, const void *data, size_t len) {
meshlink_channel_t *channel = connection->priv;
todo = left;
}
- memcpy((char *)aio->data + aio->done, p, todo);
+ if(aio->data) {
+ memcpy((char *)aio->data + aio->done, p, todo);
+ } else {
+ ssize_t result = write(aio->fd, p, todo);
+
+ if(result > 0) {
+ todo = result;
+ }
+ }
+
aio->done += todo;
if(aio->done == aio->len) {
channel->aio_receive = aio->next;
-
- if(aio->cb) {
- aio->cb(mesh, channel, aio->data, aio->len, aio->priv);
- }
-
+ aio_signal(mesh, channel, aio);
free(aio);
}
if(aio) {
/* We at least one AIO buffer. Send as much as possible form the first buffer. */
size_t left = aio->len - aio->done;
+ ssize_t sent;
if(len > left) {
len = left;
}
- ssize_t sent = utcp_send(connection, (char *)aio->data + aio->done, len);
+ if(aio->data) {
+ sent = utcp_send(connection, (char *)aio->data + aio->done, len);
+ } else {
+ char buf[65536];
+ size_t todo = utcp_get_sndbuf_free(connection);
+
+ if(todo > left) {
+ todo = left;
+ }
+
+ if(todo > sizeof(buf)) {
+ todo = sizeof(buf);
+ }
+
+ ssize_t result = read(aio->fd, buf, todo);
+
+ if(result > 0) {
+ sent = utcp_send(connection, buf, result);
+ } else {
+ sent = result;
+ }
+ }
if(sent >= 0) {
aio->done += sent;
/* If the buffer is now completely sent, call the callback and dispose of it. */
if(aio->done >= aio->len) {
channel->aio_send = aio->next;
-
- if(aio->cb) {
- aio->cb(mesh, channel, aio->data, aio->len, aio->priv);
- }
-
+ aio_signal(mesh, channel, aio);
free(aio);
}
} else {
/* Clean up any outstanding AIO buffers. */
for(meshlink_aio_buffer_t *aio = channel->aio_send, *next; aio; aio = next) {
next = aio->next;
-
- if(aio->cb) {
- aio->cb(mesh, channel, aio->data, aio->len, aio->priv);
- }
-
+ aio_signal(mesh, channel, aio);
free(aio);
}
for(meshlink_aio_buffer_t *aio = channel->aio_receive, *next; aio; aio = next) {
next = aio->next;
-
- if(aio->cb) {
- aio->cb(mesh, channel, aio->data, aio->len, aio->priv);
- }
-
+ aio_signal(mesh, channel, aio);
free(aio);
}
meshlink_aio_buffer_t *aio = xzalloc(sizeof(*aio));
aio->data = data;
aio->len = len;
- aio->cb = cb;
+ aio->cb.buffer = cb;
+ aio->priv = priv;
+
+ pthread_mutex_lock(&mesh->mesh_mutex);
+
+ /* Append the AIO buffer descriptor to the end of the chain */
+ meshlink_aio_buffer_t **p = &channel->aio_send;
+
+ while(*p) {
+ p = &(*p)->next;
+ }
+
+ *p = aio;
+
+ /* Ensure the poll callback is set, and call it right now to push data if possible */
+ utcp_set_poll_cb(channel->c, channel_poll);
+ channel_poll(channel->c, len);
+
+ pthread_mutex_unlock(&mesh->mesh_mutex);
+
+ return true;
+}
+
+bool meshlink_channel_aio_fd_send(meshlink_handle_t *mesh, meshlink_channel_t *channel, int fd, size_t len, meshlink_aio_fd_cb_t cb, void *priv) {
+ if(!mesh || !channel) {
+ meshlink_errno = MESHLINK_EINVAL;
+ return false;
+ }
+
+ if(!len || fd == -1) {
+ meshlink_errno = MESHLINK_EINVAL;
+ return false;
+ }
+
+ meshlink_aio_buffer_t *aio = xzalloc(sizeof(*aio));
+ aio->fd = fd;
+ aio->len = len;
+ aio->cb.fd = cb;
aio->priv = priv;
pthread_mutex_lock(&mesh->mesh_mutex);
meshlink_aio_buffer_t *aio = xzalloc(sizeof(*aio));
aio->data = data;
aio->len = len;
- aio->cb = cb;
+ aio->cb.buffer = cb;
+ aio->priv = priv;
+
+ pthread_mutex_lock(&mesh->mesh_mutex);
+
+ /* Append the AIO buffer descriptor to the end of the chain */
+ meshlink_aio_buffer_t **p = &channel->aio_receive;
+
+ while(*p) {
+ p = &(*p)->next;
+ }
+
+ *p = aio;
+
+ pthread_mutex_unlock(&mesh->mesh_mutex);
+
+ return true;
+}
+
+bool meshlink_channel_aio_fd_receive(meshlink_handle_t *mesh, meshlink_channel_t *channel, int fd, size_t len, meshlink_aio_fd_cb_t cb, void *priv) {
+ if(!mesh || !channel) {
+ meshlink_errno = MESHLINK_EINVAL;
+ return false;
+ }
+
+ if(!len || fd == -1) {
+ meshlink_errno = MESHLINK_EINVAL;
+ return false;
+ }
+
+ meshlink_aio_buffer_t *aio = xzalloc(sizeof(*aio));
+ aio->fd = fd;
+ aio->len = len;
+ aio->cb.fd = cb;
aio->priv = priv;
pthread_mutex_lock(&mesh->mesh_mutex);
* @param data A pointer to a buffer containing the enqueued data.
* @param len The length of the buffer.
* @param priv A private pointer which was set by the application when submitting the buffer.
-};
*/
typedef void (*meshlink_aio_cb_t)(meshlink_handle_t *mesh, meshlink_channel_t *channel, const void *data, size_t len, void *priv);
+/// A callback for asynchronous I/O to and from filedescriptors.
+/** This callbacks signals that MeshLink has finished using this filedescriptor.
+ *
+ * @param mesh A handle which represents an instance of MeshLink.
+ * @param channel A handle for the channel which used this filedescriptor.
+ * @param fd The filedescriptor that was used.
+ * @param len The length of the data that was successfully sent or received.
+ * @param priv A private pointer which was set by the application when submitting the buffer.
+ */
+typedef void (*meshlink_aio_fd_cb_t)(meshlink_handle_t *mesh, meshlink_channel_t *channel, int fd, size_t len, void *priv);
+
/// Transmit data on a channel asynchronously
/** This registers a buffer that will be used to send data to the remote node.
* Multiple buffers can be registered, in which case data will be sent in the order the buffers were registered.
*/
extern bool meshlink_channel_aio_send(meshlink_handle_t *mesh, meshlink_channel_t *channel, const void *data, size_t len, meshlink_aio_cb_t cb, void *priv);
+/// Transmit data on a channel asynchronously from a filedescriptor
+/** This will read up to the specified length number of bytes from the given filedescriptor, and send it over the channel.
+ * The callback may be returned early if there is an error reading from the filedescriptor.
+ * While there is still with unsent data, the poll callback will not be called.
+ *
+ * @param mesh A handle which represents an instance of MeshLink.
+ * @param channel A handle for the channel.
+ * @param fd A file descriptor from which data will be read.
+ * @param len The length of the data, or 0 if there is no data to send.
+ * @param cb A pointer to the function which will be called when MeshLink has finished using the filedescriptor.
+ *
+ * @return True if the buffer was enqueued, false otherwise.
+ */
+extern bool meshlink_channel_aio_fd_send(meshlink_handle_t *mesh, meshlink_channel_t *channel, int fd, size_t len, meshlink_aio_fd_cb_t cb, void *priv);
+
/// Receive data on a channel asynchronously
/** This registers a buffer that will be filled with incoming channel data.
* Multiple buffers can be registered, in which case data will be received in the order the buffers were registered.
*/
extern bool meshlink_channel_aio_receive(meshlink_handle_t *mesh, meshlink_channel_t *channel, const void *data, size_t len, meshlink_aio_cb_t cb, void *priv);
+/// Receive data on a channel asynchronously and send it to a filedescriptor
+/** This will read up to the specified length number of bytes from the channel, and send it to the filedescriptor.
+ * The callback may be returned early if there is an error writing to the filedescriptor.
+ * While there is still unread data, the receive callback will not be called.
+ *
+ * @param mesh A handle which represents an instance of MeshLink.
+ * @param channel A handle for the channel.
+ * @param fd A file descriptor to which data will be written.
+ * @param len The length of the data.
+ * @param cb A pointer to the function which will be called when MeshLink has finished using the filedescriptor.
+ *
+ * @return True if the buffer was enqueued, false otherwise.
+ */
+extern bool meshlink_channel_aio_fd_receive(meshlink_handle_t *mesh, meshlink_channel_t *channel, int fd, size_t len, meshlink_aio_fd_cb_t cb, void *priv);
+
/// Get channel flags.
/** This returns the flags used when opening this channel.
*
meshlink_add_address
meshlink_add_external_address
meshlink_blacklist
+meshlink_channel_aio_fd_receive
+meshlink_channel_aio_fd_send
meshlink_channel_aio_receive
meshlink_channel_aio_send
meshlink_channel_close
/// An AIO buffer.
typedef struct meshlink_aio_buffer {
const void *data;
+ int fd;
size_t len;
size_t done;
- meshlink_aio_cb_t cb;
+ union {
+ meshlink_aio_cb_t buffer;
+ meshlink_aio_fd_cb_t fd;
+ } cb;
void *priv;
struct meshlink_aio_buffer *next;
} meshlink_aio_buffer_t;
basicpp.test \
channels.test \
channels-aio.test \
+ channels-aio-fd.test \
channels-cornercases.test \
channels-failure.test \
channels-fork.test \
basicpp \
channels \
channels-aio \
+ channels-aio-fd \
channels-cornercases \
channels-failure \
channels-fork \
channels_aio_SOURCES = channels-aio.c utils.c utils.h
channels_aio_LDADD = ../src/libmeshlink.la
+channels_aio_fd_SOURCES = channels-aio-fd.c utils.c utils.h
+channels_aio_fd_LDADD = ../src/libmeshlink.la
+
channels_no_partial_SOURCES = channels-no-partial.c utils.c utils.h
channels_no_partial_LDADD = ../src/libmeshlink.la
--- /dev/null
+#include <assert.h>
+#include <stdio.h>
+#include <unistd.h>
+#include <stdlib.h>
+#include <string.h>
+#include <sys/time.h>
+#include <limits.h>
+
+#include "meshlink.h"
+#include "utils.h"
+
+static const size_t size = 1024 * 1024; // size of data to transfer
+static const size_t nchannels = 4; // number of simultaneous channels
+
+struct aio_info {
+ int callbacks;
+ size_t size;
+ struct timeval tv;
+ struct sync_flag flag;
+};
+
+struct channel_info {
+ FILE *file;
+ struct aio_info aio_infos[2];
+};
+
+static void aio_fd_cb(meshlink_handle_t *mesh, meshlink_channel_t *channel, int fd, size_t len, void *priv) {
+ (void)mesh;
+ (void)channel;
+ (void)fd;
+ (void)len;
+
+ struct aio_info *info = priv;
+ gettimeofday(&info->tv, NULL);
+ info->callbacks++;
+ info->size += len;
+ set_sync_flag(&info->flag, true);
+}
+
+static bool reject_cb(meshlink_handle_t *mesh, meshlink_channel_t *channel, uint16_t port, const void *data, size_t len) {
+ (void)mesh;
+ (void)channel;
+ (void)port;
+ (void)data;
+ (void)len;
+
+ return false;
+}
+
+static bool accept_cb(meshlink_handle_t *mesh, meshlink_channel_t *channel, uint16_t port, const void *data, size_t len) {
+ assert(port && port <= nchannels);
+ assert(!data);
+ assert(!len);
+
+ struct channel_info *infos = mesh->priv;
+ struct channel_info *info = &infos[port - 1];
+
+ assert(meshlink_channel_aio_fd_receive(mesh, channel, fileno(info->file), size / 4, aio_fd_cb, &info->aio_infos[0]));
+ assert(meshlink_channel_aio_fd_receive(mesh, channel, fileno(info->file), size - size / 4, aio_fd_cb, &info->aio_infos[1]));
+
+ return true;
+}
+
+int main(int argc, char *argv[]) {
+ (void)argc;
+ (void)argv;
+
+ // Prepare file
+
+ char *outdata = malloc(size);
+
+ assert(outdata);
+
+ for(size_t i = 0; i < size; i++) {
+ // Human readable output
+ outdata[i] = i % 96 ? i % 96 + 32 : '\n';
+ }
+
+ FILE *file = fopen("channels_aio_fd.in", "w");
+ assert(file);
+ assert(fwrite(outdata, size, 1, file) == 1);
+ assert(fclose(file) == 0);
+
+ struct channel_info in_infos[nchannels];
+ struct channel_info out_infos[nchannels];
+
+ memset(in_infos, 0, sizeof(in_infos));
+ memset(out_infos, 0, sizeof(out_infos));
+
+ for(size_t i = 0; i < nchannels; i++) {
+ char filename[PATH_MAX];
+ snprintf(filename, sizeof(filename), "channels_aio_fd.out%d", (int)i);
+ in_infos[i].file = fopen(filename, "w");
+ assert(in_infos[i].file);
+ out_infos[i].file = fopen("channels_aio_fd.in", "r");
+ assert(out_infos[i].file);
+ }
+
+ // Open two new meshlink instance.
+
+ meshlink_destroy("channels_aio_fd_conf.1");
+ meshlink_destroy("channels_aio_fd_conf.2");
+
+ meshlink_handle_t *mesh1 = meshlink_open("channels_aio_fd_conf.1", "foo", "channels", DEV_CLASS_BACKBONE);
+ assert(mesh1);
+
+ meshlink_handle_t *mesh2 = meshlink_open("channels_aio_fd_conf.2", "bar", "channels", DEV_CLASS_BACKBONE);
+ assert(mesh2);
+
+ mesh2->priv = in_infos;
+
+ meshlink_enable_discovery(mesh1, false);
+ meshlink_enable_discovery(mesh2, false);
+
+ // Import and export both side's data
+
+ meshlink_add_address(mesh1, "localhost");
+
+ char *data = meshlink_export(mesh1);
+ assert(data);
+ assert(meshlink_import(mesh2, data));
+ free(data);
+
+ data = meshlink_export(mesh2);
+ assert(data);
+ assert(meshlink_import(mesh1, data));
+ free(data);
+
+ // Set the callbacks.
+
+ meshlink_set_channel_accept_cb(mesh1, reject_cb);
+ meshlink_set_channel_accept_cb(mesh2, accept_cb);
+
+ // Start both instances
+
+ assert(meshlink_start(mesh1));
+ assert(meshlink_start(mesh2));
+
+ // Open channels from foo to bar.
+
+ meshlink_node_t *bar = meshlink_get_node(mesh1, "bar");
+ assert(bar);
+
+ meshlink_channel_t *channels[nchannels];
+
+ for(size_t i = 0; i < nchannels; i++) {
+ channels[i] = meshlink_channel_open(mesh1, bar, i + 1, NULL, NULL, 0);
+ assert(channels[i]);
+ }
+
+ // Send a large buffer of data on each channel.
+
+ for(size_t i = 0; i < nchannels; i++) {
+ assert(meshlink_channel_aio_fd_send(mesh1, channels[i], fileno(out_infos[i].file), size / 3, aio_fd_cb, &out_infos[i].aio_infos[0]));
+ assert(meshlink_channel_aio_fd_send(mesh1, channels[i], fileno(out_infos[i].file), size - size / 3, aio_fd_cb, &out_infos[i].aio_infos[1]));
+ }
+
+ // Wait for everyone to finish.
+
+ for(size_t i = 0; i < nchannels; i++) {
+ assert(wait_sync_flag(&out_infos[i].aio_infos[0].flag, 10));
+ assert(wait_sync_flag(&out_infos[i].aio_infos[1].flag, 10));
+ assert(wait_sync_flag(&in_infos[i].aio_infos[0].flag, 10));
+ assert(wait_sync_flag(&in_infos[i].aio_infos[1].flag, 10));
+ }
+
+ // Check that everything is correct.
+
+ for(size_t i = 0; i < nchannels; i++) {
+ assert(fclose(in_infos[i].file) == 0);
+ assert(fclose(out_infos[i].file) == 0);
+
+ // One callback for each AIO buffer.
+ assert(out_infos[i].aio_infos[0].callbacks == 1);
+ assert(out_infos[i].aio_infos[1].callbacks == 1);
+ assert(in_infos[i].aio_infos[0].callbacks == 1);
+ assert(in_infos[i].aio_infos[1].callbacks == 1);
+
+ // Correct size sent and received.
+ assert(out_infos[i].aio_infos[0].size == size / 3);
+ assert(out_infos[i].aio_infos[1].size == size - size / 3);
+ assert(in_infos[i].aio_infos[0].size == size / 4);
+ assert(in_infos[i].aio_infos[1].size == size - size / 4);
+
+ // First batch of data should all be sent and received before the second batch
+ for(size_t j = 0; j < nchannels; j++) {
+ assert(timercmp(&out_infos[i].aio_infos[0].tv, &out_infos[j].aio_infos[1].tv, <=));
+ assert(timercmp(&in_infos[i].aio_infos[0].tv, &in_infos[j].aio_infos[1].tv, <=));
+ }
+
+ // Files should be identical
+ char command[PATH_MAX];
+ snprintf(command, sizeof(command), "cmp channels_aio_fd.in channels_aio_fd.out%d", (int)i);
+ assert(system(command) == 0);
+
+ }
+
+ // Clean up.
+
+ meshlink_close(mesh2);
+ meshlink_close(mesh1);
+
+ return 0;
+}
--- /dev/null
+#!/bin/sh
+
+rm -Rf channels_aio_fd_conf.*
+./channels-aio-fd