#include "meshlink.h"
#include "utils.h"
-static const size_t size = 10000000; // size of data to transfer
-static bool bar_reachable = false;
-static int bar_callbacks = 0;
-static int foo_callbacks = 0;
-static size_t bar_received = 0;
-static struct sync_flag bar_finished_flag;
-
-void log_cb(meshlink_handle_t *mesh, meshlink_log_level_t level, const char *text) {
- static struct timeval tv0;
- struct timeval tv;
-
- if(tv0.tv_sec == 0) {
- gettimeofday(&tv0, NULL);
- }
-
- gettimeofday(&tv, NULL);
- fprintf(stderr, "%u.%.03u ", (unsigned int)(tv.tv_sec - tv0.tv_sec), (unsigned int)tv.tv_usec / 1000);
-
- if(mesh) {
- fprintf(stderr, "(%s) ", mesh->name);
- }
+static const size_t size = 25000000; // size of data to transfer
+static const size_t smallsize = 100000; // size of the data to transfer without AIO
+static const size_t nchannels = 4; // number of simultaneous channels
- fprintf(stderr, "[%d] %s\n", level, text);
-}
+struct aio_info {
+ int callbacks;
+ size_t size;
+ struct timeval tv;
+ struct sync_flag flag;
+};
-void status_cb(meshlink_handle_t *mesh, meshlink_node_t *node, bool reachable) {
- (void)mesh;
+struct channel_info {
+ char *data;
+ struct aio_info aio_infos[2];
+};
- if(!strcmp(node->name, "bar")) {
- bar_reachable = reachable;
- }
-}
+static size_t bar_received_len;
+static struct timeval bar_received_tv;
+static struct sync_flag bar_received_flag;
-void foo_aio_cb(meshlink_handle_t *mesh, meshlink_channel_t *channel, const void *data, size_t len, void *priv) {
+static void aio_cb(meshlink_handle_t *mesh, meshlink_channel_t *channel, const void *data, size_t len, void *priv) {
(void)mesh;
(void)channel;
(void)data;
(void)len;
- (void)priv;
- foo_callbacks++;
+ struct aio_info *info = priv;
+ gettimeofday(&info->tv, NULL);
+ info->callbacks++;
+ info->size += len;
+ set_sync_flag(&info->flag, true);
}
-void bar_aio_cb(meshlink_handle_t *mesh, meshlink_channel_t *channel, const void *data, size_t len, void *priv) {
+static bool reject_cb(meshlink_handle_t *mesh, meshlink_channel_t *channel, uint16_t port, const void *data, size_t len) {
(void)mesh;
(void)channel;
+ (void)port;
(void)data;
(void)len;
- (void)priv;
- bar_callbacks++;
- bar_received += len;
-
- if(bar_received >= size) {
- set_sync_flag(&bar_finished_flag, true);
- }
+ return false;
}
-bool reject_cb(meshlink_handle_t *mesh, meshlink_channel_t *channel, uint16_t port, const void *data, size_t len) {
+static void receive_cb(meshlink_handle_t *mesh, meshlink_channel_t *channel, const void *data, size_t len) {
(void)mesh;
(void)channel;
- (void)port;
(void)data;
- (void)len;
- return false;
+ bar_received_len += len;
+
+ if(bar_received_len >= smallsize) {
+ gettimeofday(&bar_received_tv, NULL);
+ set_sync_flag(&bar_received_flag, true);
+ }
}
-bool accept_cb(meshlink_handle_t *mesh, meshlink_channel_t *channel, uint16_t port, const void *data, size_t len) {
- assert(port == 7);
+static bool accept_cb(meshlink_handle_t *mesh, meshlink_channel_t *channel, uint16_t port, const void *data, size_t len) {
+ assert(port && port <= nchannels + 1);
assert(!data);
assert(!len);
- char *outdata = mesh->priv;
-
- meshlink_set_channel_receive_cb(mesh, channel, NULL);
- assert(meshlink_channel_aio_receive(mesh, channel, outdata, size / 4, bar_aio_cb, NULL));
- assert(meshlink_channel_aio_receive(mesh, channel, outdata + size / 4, size - size / 4, bar_aio_cb, NULL));
- return true;
-}
+ if(port <= nchannels) {
+ struct channel_info *infos = mesh->priv;
+ struct channel_info *info = &infos[port - 1];
-void poll_cb(meshlink_handle_t *mesh, meshlink_channel_t *channel, size_t len) {
- (void)len;
+ assert(meshlink_channel_aio_receive(mesh, channel, info->data, size / 4, aio_cb, &info->aio_infos[0]));
+ assert(meshlink_channel_aio_receive(mesh, channel, info->data + size / 4, size - size / 4, aio_cb, &info->aio_infos[1]));
+ } else {
+ meshlink_set_channel_receive_cb(mesh, channel, receive_cb);
+ }
- meshlink_set_channel_poll_cb(mesh, channel, NULL);
- assert(meshlink_channel_send(mesh, channel, "Hello", 5) == 5);
+ return true;
}
int main(int argc, char *argv[]) {
// Prepare data buffers
char *outdata = malloc(size);
- char *indata = malloc(size);
assert(outdata);
- assert(indata);
for(size_t i = 0; i < size; i++) {
outdata[i] = i;
}
- memset(indata, 0, size);
+ struct channel_info in_infos[nchannels];
+
+ struct channel_info out_infos[nchannels];
- meshlink_set_log_cb(NULL, MESHLINK_DEBUG, log_cb);
+ memset(in_infos, 0, sizeof(in_infos));
+
+ memset(out_infos, 0, sizeof(out_infos));
+
+ for(size_t i = 0; i < nchannels; i++) {
+ in_infos[i].data = malloc(size);
+ assert(in_infos[i].data);
+ out_infos[i].data = outdata;
+ }
// Open two new meshlink instance.
meshlink_handle_t *mesh2 = meshlink_open("channels_aio_conf.2", "bar", "channels", DEV_CLASS_BACKBONE);
assert(mesh2);
- mesh2->priv = indata;
-
- meshlink_set_log_cb(mesh1, MESHLINK_DEBUG, log_cb);
- meshlink_set_log_cb(mesh2, MESHLINK_DEBUG, log_cb);
+ mesh2->priv = in_infos;
meshlink_enable_discovery(mesh1, false);
meshlink_enable_discovery(mesh2, false);
meshlink_set_channel_accept_cb(mesh1, reject_cb);
meshlink_set_channel_accept_cb(mesh2, accept_cb);
- meshlink_set_node_status_cb(mesh1, status_cb);
-
// Start both instances
assert(meshlink_start(mesh1));
assert(meshlink_start(mesh2));
- // Open a channel from foo to bar.
+ // Open channels from foo to bar.
meshlink_node_t *bar = meshlink_get_node(mesh1, "bar");
assert(bar);
- meshlink_channel_t *channel = meshlink_channel_open(mesh1, bar, 7, NULL, NULL, 0);
- assert(channel);
+ meshlink_channel_t *channels[nchannels + 1];
+
+ for(size_t i = 0; i < nchannels + 1; i++) {
+ channels[i] = meshlink_channel_open(mesh1, bar, i + 1, NULL, NULL, 0);
+ assert(channels[i]);
+ }
+
+ // Send a large buffer of data on each channel.
- // Send a large buffer of data.
+ for(size_t i = 0; i < nchannels; i++) {
+ assert(meshlink_channel_aio_send(mesh1, channels[i], outdata, size / 3, aio_cb, &out_infos[i].aio_infos[0]));
+ assert(meshlink_channel_aio_send(mesh1, channels[i], outdata + size / 3, size - size / 3, aio_cb, &out_infos[i].aio_infos[1]));
+ }
- assert(meshlink_channel_aio_send(mesh1, channel, outdata, size / 2, foo_aio_cb, NULL));
- assert(meshlink_channel_aio_send(mesh1, channel, outdata + size / 2, size - size / 2, foo_aio_cb, NULL));
+ // Send a little bit on the last channel using a regular send
- assert(wait_sync_flag(&bar_finished_flag, 10));
+ assert(meshlink_channel_send(mesh1, channels[nchannels], outdata, smallsize) == smallsize);
- assert(foo_callbacks == 2);
- assert(bar_callbacks == 2);
- assert(bar_received == size);
- assert(!memcmp(indata, outdata, size));
+ // Wait for everyone to finish.
+
+ assert(wait_sync_flag(&bar_received_flag, 10));
+
+ for(size_t i = 0; i < nchannels; i++) {
+ assert(wait_sync_flag(&out_infos[i].aio_infos[0].flag, 10));
+ assert(wait_sync_flag(&out_infos[i].aio_infos[1].flag, 10));
+ assert(wait_sync_flag(&in_infos[i].aio_infos[0].flag, 10));
+ assert(wait_sync_flag(&in_infos[i].aio_infos[1].flag, 10));
+ }
+
+ // Check that everything is correct.
+
+ assert(bar_received_len == smallsize);
+
+ for(size_t i = 0; i < nchannels; i++) {
+ // Data should be transferred intact.
+ assert(!memcmp(in_infos[i].data, out_infos[i].data, size));
+
+ // One callback for each AIO buffer.
+ assert(out_infos[i].aio_infos[0].callbacks == 1);
+ assert(out_infos[i].aio_infos[1].callbacks == 1);
+ assert(in_infos[i].aio_infos[0].callbacks == 1);
+ assert(in_infos[i].aio_infos[1].callbacks == 1);
+
+ // Correct size sent and received.
+ assert(out_infos[i].aio_infos[0].size == size / 3);
+ assert(out_infos[i].aio_infos[1].size == size - size / 3);
+ assert(in_infos[i].aio_infos[0].size == size / 4);
+ assert(in_infos[i].aio_infos[1].size == size - size / 4);
+
+ // First batch of data should all be sent and received before the second batch
+ for(size_t j = 0; j < nchannels; j++) {
+ assert(timercmp(&out_infos[i].aio_infos[0].tv, &out_infos[j].aio_infos[1].tv, <=));
+ assert(timercmp(&in_infos[i].aio_infos[0].tv, &in_infos[j].aio_infos[1].tv, <=));
+ }
+
+ // The non-AIO transfer should have completed before everything else
+ assert(timercmp(&out_infos[i].aio_infos[0].tv, &bar_received_tv, >=));
+ assert(timercmp(&in_infos[i].aio_infos[0].tv, &bar_received_tv, >=));
+ }
// Clean up.