X-Git-Url: http://git.meshlink.io/?a=blobdiff_plain;f=test%2Fchannels-aio.c;h=bf4b870410e49b1dba5ba0fba4afe23b088a3fbd;hb=9a2520c36431a8a5fd90451e97f488c22f4decc5;hp=ef4c0a3fd1c27fe37a53466fa6c6433fa5c77292;hpb=57114d942004e8a34ff22aadc0c620a0aabbb423;p=meshlink diff --git a/test/channels-aio.c b/test/channels-aio.c index ef4c0a3f..bf4b8704 100644 --- a/test/channels-aio.c +++ b/test/channels-aio.c @@ -8,183 +8,191 @@ #include "meshlink.h" #include "utils.h" -static const size_t size = 10000000; // size of data to transfer -static bool bar_reachable = false; -static int bar_callbacks = 0; -static int foo_callbacks = 0; -static size_t bar_received = 0; -static struct sync_flag bar_finished_flag; - -void log_cb(meshlink_handle_t *mesh, meshlink_log_level_t level, const char *text) { - static struct timeval tv0; - struct timeval tv; - - if(tv0.tv_sec == 0) { - gettimeofday(&tv0, NULL); - } - - gettimeofday(&tv, NULL); - fprintf(stderr, "%u.%.03u ", (unsigned int)(tv.tv_sec - tv0.tv_sec), (unsigned int)tv.tv_usec / 1000); - - if(mesh) { - fprintf(stderr, "(%s) ", mesh->name); - } +static const size_t size = 25000000; // size of data to transfer +static const size_t smallsize = 100000; // size of the data to transfer without AIO +static const size_t nchannels = 4; // number of simultaneous channels - fprintf(stderr, "[%d] %s\n", level, text); -} +struct aio_info { + int callbacks; + size_t size; + struct timeval tv; + struct sync_flag flag; +}; -void status_cb(meshlink_handle_t *mesh, meshlink_node_t *node, bool reachable) { - (void)mesh; +struct channel_info { + char *data; + struct aio_info aio_infos[2]; +}; - if(!strcmp(node->name, "bar")) { - bar_reachable = reachable; - } -} +static size_t b_received_len; +static struct timeval b_received_tv; +static struct sync_flag b_received_flag; -void foo_aio_cb(meshlink_handle_t *mesh, meshlink_channel_t *channel, const void *data, size_t len, void *priv) { +static void aio_cb(meshlink_handle_t *mesh, meshlink_channel_t *channel, const void *data, size_t len, void *priv) { (void)mesh; (void)channel; (void)data; (void)len; - (void)priv; - foo_callbacks++; + struct aio_info *info = priv; + gettimeofday(&info->tv, NULL); + info->callbacks++; + info->size += len; + set_sync_flag(&info->flag, true); } -void bar_aio_cb(meshlink_handle_t *mesh, meshlink_channel_t *channel, const void *data, size_t len, void *priv) { +static bool reject_cb(meshlink_handle_t *mesh, meshlink_channel_t *channel, uint16_t port, const void *data, size_t len) { (void)mesh; (void)channel; + (void)port; (void)data; (void)len; - (void)priv; - - bar_callbacks++; - bar_received += len; - if(bar_received >= size) { - set_sync_flag(&bar_finished_flag, true); - } + return false; } -bool reject_cb(meshlink_handle_t *mesh, meshlink_channel_t *channel, uint16_t port, const void *data, size_t len) { +static void receive_cb(meshlink_handle_t *mesh, meshlink_channel_t *channel, const void *data, size_t len) { (void)mesh; (void)channel; - (void)port; (void)data; - (void)len; - return false; + b_received_len += len; + + if(b_received_len >= smallsize) { + gettimeofday(&b_received_tv, NULL); + set_sync_flag(&b_received_flag, true); + } } -bool accept_cb(meshlink_handle_t *mesh, meshlink_channel_t *channel, uint16_t port, const void *data, size_t len) { - assert(port == 7); +static bool accept_cb(meshlink_handle_t *mesh, meshlink_channel_t *channel, uint16_t port, const void *data, size_t len) { + assert(port && port <= nchannels + 1); assert(!data); assert(!len); - char *outdata = mesh->priv; - meshlink_set_channel_receive_cb(mesh, channel, NULL); - assert(meshlink_channel_aio_receive(mesh, channel, outdata, size / 4, bar_aio_cb, NULL)); - assert(meshlink_channel_aio_receive(mesh, channel, outdata + size / 4, size - size / 4, bar_aio_cb, NULL)); + if(port <= nchannels) { + struct channel_info *infos = mesh->priv; + struct channel_info *info = &infos[port - 1]; - return true; -} - -void poll_cb(meshlink_handle_t *mesh, meshlink_channel_t *channel, size_t len) { - (void)len; + assert(meshlink_channel_aio_receive(mesh, channel, info->data, size / 4, aio_cb, &info->aio_infos[0])); + assert(meshlink_channel_aio_receive(mesh, channel, info->data + size / 4, size - size / 4, aio_cb, &info->aio_infos[1])); + } else { + meshlink_set_channel_receive_cb(mesh, channel, receive_cb); + } - meshlink_set_channel_poll_cb(mesh, channel, NULL); - assert(meshlink_channel_send(mesh, channel, "Hello", 5) == 5); + return true; } int main(int argc, char *argv[]) { (void)argc; (void)argv; + meshlink_set_log_cb(NULL, MESHLINK_WARNING, log_cb); + // Prepare data buffers char *outdata = malloc(size); - char *indata = malloc(size); - assert(outdata); - assert(indata); for(size_t i = 0; i < size; i++) { outdata[i] = i; } - memset(indata, 0, size); + struct channel_info in_infos[nchannels]; - meshlink_set_log_cb(NULL, MESHLINK_DEBUG, log_cb); + struct channel_info out_infos[nchannels]; + + memset(in_infos, 0, sizeof(in_infos)); + + memset(out_infos, 0, sizeof(out_infos)); + + for(size_t i = 0; i < nchannels; i++) { + in_infos[i].data = malloc(size); + assert(in_infos[i].data); + out_infos[i].data = outdata; + } // Open two new meshlink instance. - meshlink_destroy("channels_aio_conf.1"); - meshlink_destroy("channels_aio_conf.2"); + meshlink_handle_t *mesh_a, *mesh_b; + open_meshlink_pair(&mesh_a, &mesh_b, "channels_aio"); - meshlink_handle_t *mesh1 = meshlink_open("channels_aio_conf.1", "foo", "channels", DEV_CLASS_BACKBONE); - assert(mesh1); + // Set the callbacks. - meshlink_handle_t *mesh2 = meshlink_open("channels_aio_conf.2", "bar", "channels", DEV_CLASS_BACKBONE); - assert(mesh2); + mesh_b->priv = in_infos; - mesh2->priv = indata; + meshlink_set_channel_accept_cb(mesh_a, reject_cb); + meshlink_set_channel_accept_cb(mesh_b, accept_cb); - meshlink_set_log_cb(mesh1, MESHLINK_DEBUG, log_cb); - meshlink_set_log_cb(mesh2, MESHLINK_DEBUG, log_cb); + // Start both instances - meshlink_enable_discovery(mesh1, false); - meshlink_enable_discovery(mesh2, false); + start_meshlink_pair(mesh_a, mesh_b); - // Import and export both side's data + // Open channels from a to b. - meshlink_add_address(mesh1, "localhost"); + meshlink_node_t *b = meshlink_get_node(mesh_a, "b"); + assert(b); - char *data = meshlink_export(mesh1); - assert(data); - assert(meshlink_import(mesh2, data)); - free(data); + meshlink_channel_t *channels[nchannels + 1]; - data = meshlink_export(mesh2); - assert(data); - assert(meshlink_import(mesh1, data)); - free(data); + for(size_t i = 0; i < nchannels + 1; i++) { + channels[i] = meshlink_channel_open(mesh_a, b, i + 1, NULL, NULL, 0); + assert(channels[i]); + } - // Set the callbacks. + // Send a large buffer of data on each channel. - meshlink_set_channel_accept_cb(mesh1, reject_cb); - meshlink_set_channel_accept_cb(mesh2, accept_cb); + for(size_t i = 0; i < nchannels; i++) { + assert(meshlink_channel_aio_send(mesh_a, channels[i], outdata, size / 3, aio_cb, &out_infos[i].aio_infos[0])); + assert(meshlink_channel_aio_send(mesh_a, channels[i], outdata + size / 3, size - size / 3, aio_cb, &out_infos[i].aio_infos[1])); + } - meshlink_set_node_status_cb(mesh1, status_cb); + // Send a little bit on the last channel using a regular send - // Start both instances + assert(meshlink_channel_send(mesh_a, channels[nchannels], outdata, smallsize) == (ssize_t)smallsize); - assert(meshlink_start(mesh1)); - assert(meshlink_start(mesh2)); + // Wait for everyone to finish. - // Open a channel from foo to bar. + assert(wait_sync_flag(&b_received_flag, 10)); - meshlink_node_t *bar = meshlink_get_node(mesh1, "bar"); - assert(bar); + for(size_t i = 0; i < nchannels; i++) { + assert(wait_sync_flag(&out_infos[i].aio_infos[0].flag, 10)); + assert(wait_sync_flag(&out_infos[i].aio_infos[1].flag, 10)); + assert(wait_sync_flag(&in_infos[i].aio_infos[0].flag, 10)); + assert(wait_sync_flag(&in_infos[i].aio_infos[1].flag, 10)); + } - meshlink_channel_t *channel = meshlink_channel_open(mesh1, bar, 7, NULL, NULL, 0); - assert(channel); + // Check that everything is correct. - // Send a large buffer of data. + assert(b_received_len == smallsize); - assert(meshlink_channel_aio_send(mesh1, channel, outdata, size / 2, foo_aio_cb, NULL)); - assert(meshlink_channel_aio_send(mesh1, channel, outdata + size / 2, size - size / 2, foo_aio_cb, NULL)); + for(size_t i = 0; i < nchannels; i++) { + // Data should be transferred intact. + assert(!memcmp(in_infos[i].data, out_infos[i].data, size)); - assert(wait_sync_flag(&bar_finished_flag, 10)); + // One callback for each AIO buffer. + assert(out_infos[i].aio_infos[0].callbacks == 1); + assert(out_infos[i].aio_infos[1].callbacks == 1); + assert(in_infos[i].aio_infos[0].callbacks == 1); + assert(in_infos[i].aio_infos[1].callbacks == 1); - assert(foo_callbacks == 2); - assert(bar_callbacks == 2); - assert(bar_received == size); - assert(!memcmp(indata, outdata, size)); + // Correct size sent and received. + assert(out_infos[i].aio_infos[0].size == size / 3); + assert(out_infos[i].aio_infos[1].size == size - size / 3); + assert(in_infos[i].aio_infos[0].size == size / 4); + assert(in_infos[i].aio_infos[1].size == size - size / 4); - // Clean up. + // First batch of data should all be sent and received before the second batch + for(size_t j = 0; j < nchannels; j++) { + assert(timercmp(&out_infos[i].aio_infos[0].tv, &out_infos[j].aio_infos[1].tv, <=)); + assert(timercmp(&in_infos[i].aio_infos[0].tv, &in_infos[j].aio_infos[1].tv, <=)); + } - meshlink_close(mesh2); - meshlink_close(mesh1); + // The non-AIO transfer should have completed before everything else + assert(timercmp(&out_infos[i].aio_infos[0].tv, &b_received_tv, >=)); + assert(timercmp(&in_infos[i].aio_infos[0].tv, &b_received_tv, >=)); + } + + // Clean up. - return 0; + close_meshlink_pair(mesh_a, mesh_b); }