From 69ca435ff7948071326bf43796b5c375ac7c1c49 Mon Sep 17 00:00:00 2001 From: Guus Sliepen Date: Thu, 15 Aug 2019 22:41:15 +0200 Subject: [PATCH] Test concurrent AIO and non-AIO transfers. Create 5 channels; 4 transfer a large amount of data via AIO, the 5th does a regular meshlink_channel_send(). Verify that there is concurrency between all channels. --- test/channels-aio.c | 190 ++++++++++++++++++++++++++------------------ 1 file changed, 112 insertions(+), 78 deletions(-) diff --git a/test/channels-aio.c b/test/channels-aio.c index ef4c0a3f..42cdadd5 100644 --- a/test/channels-aio.c +++ b/test/channels-aio.c @@ -8,92 +8,78 @@ #include "meshlink.h" #include "utils.h" -static const size_t size = 10000000; // size of data to transfer -static bool bar_reachable = false; -static int bar_callbacks = 0; -static int foo_callbacks = 0; -static size_t bar_received = 0; -static struct sync_flag bar_finished_flag; - -void log_cb(meshlink_handle_t *mesh, meshlink_log_level_t level, const char *text) { - static struct timeval tv0; - struct timeval tv; - - if(tv0.tv_sec == 0) { - gettimeofday(&tv0, NULL); - } - - gettimeofday(&tv, NULL); - fprintf(stderr, "%u.%.03u ", (unsigned int)(tv.tv_sec - tv0.tv_sec), (unsigned int)tv.tv_usec / 1000); - - if(mesh) { - fprintf(stderr, "(%s) ", mesh->name); - } +static const size_t size = 25000000; // size of data to transfer +static const size_t smallsize = 100000; // size of the data to transfer without AIO +static const size_t nchannels = 4; // number of simultaneous channels - fprintf(stderr, "[%d] %s\n", level, text); -} +struct aio_info { + int callbacks; + size_t size; + struct timeval tv; + struct sync_flag flag; +}; -void status_cb(meshlink_handle_t *mesh, meshlink_node_t *node, bool reachable) { - (void)mesh; +struct channel_info { + char *data; + struct aio_info aio_infos[2]; +}; - if(!strcmp(node->name, "bar")) { - bar_reachable = reachable; - } -} +static size_t bar_received_len; +static struct timeval bar_received_tv; +static struct sync_flag bar_received_flag; -void foo_aio_cb(meshlink_handle_t *mesh, meshlink_channel_t *channel, const void *data, size_t len, void *priv) { +static void aio_cb(meshlink_handle_t *mesh, meshlink_channel_t *channel, const void *data, size_t len, void *priv) { (void)mesh; (void)channel; (void)data; (void)len; - (void)priv; - foo_callbacks++; + struct aio_info *info = priv; + gettimeofday(&info->tv, NULL); + info->callbacks++; + info->size += len; + set_sync_flag(&info->flag, true); } -void bar_aio_cb(meshlink_handle_t *mesh, meshlink_channel_t *channel, const void *data, size_t len, void *priv) { +static bool reject_cb(meshlink_handle_t *mesh, meshlink_channel_t *channel, uint16_t port, const void *data, size_t len) { (void)mesh; (void)channel; + (void)port; (void)data; (void)len; - (void)priv; - bar_callbacks++; - bar_received += len; - - if(bar_received >= size) { - set_sync_flag(&bar_finished_flag, true); - } + return false; } -bool reject_cb(meshlink_handle_t *mesh, meshlink_channel_t *channel, uint16_t port, const void *data, size_t len) { +static void receive_cb(meshlink_handle_t *mesh, meshlink_channel_t *channel, const void *data, size_t len) { (void)mesh; (void)channel; - (void)port; (void)data; - (void)len; - return false; + bar_received_len += len; + + if(bar_received_len >= smallsize) { + gettimeofday(&bar_received_tv, NULL); + set_sync_flag(&bar_received_flag, true); + } } -bool accept_cb(meshlink_handle_t *mesh, meshlink_channel_t *channel, uint16_t port, const void *data, size_t len) { - assert(port == 7); +static bool accept_cb(meshlink_handle_t *mesh, meshlink_channel_t *channel, uint16_t port, const void *data, size_t len) { + assert(port && port <= nchannels + 1); assert(!data); assert(!len); - char *outdata = mesh->priv; - - meshlink_set_channel_receive_cb(mesh, channel, NULL); - assert(meshlink_channel_aio_receive(mesh, channel, outdata, size / 4, bar_aio_cb, NULL)); - assert(meshlink_channel_aio_receive(mesh, channel, outdata + size / 4, size - size / 4, bar_aio_cb, NULL)); - return true; -} + if(port <= nchannels) { + struct channel_info *infos = mesh->priv; + struct channel_info *info = &infos[port - 1]; -void poll_cb(meshlink_handle_t *mesh, meshlink_channel_t *channel, size_t len) { - (void)len; + assert(meshlink_channel_aio_receive(mesh, channel, info->data, size / 4, aio_cb, &info->aio_infos[0])); + assert(meshlink_channel_aio_receive(mesh, channel, info->data + size / 4, size - size / 4, aio_cb, &info->aio_infos[1])); + } else { + meshlink_set_channel_receive_cb(mesh, channel, receive_cb); + } - meshlink_set_channel_poll_cb(mesh, channel, NULL); - assert(meshlink_channel_send(mesh, channel, "Hello", 5) == 5); + return true; } int main(int argc, char *argv[]) { @@ -103,18 +89,26 @@ int main(int argc, char *argv[]) { // Prepare data buffers char *outdata = malloc(size); - char *indata = malloc(size); assert(outdata); - assert(indata); for(size_t i = 0; i < size; i++) { outdata[i] = i; } - memset(indata, 0, size); + struct channel_info in_infos[nchannels]; + + struct channel_info out_infos[nchannels]; - meshlink_set_log_cb(NULL, MESHLINK_DEBUG, log_cb); + memset(in_infos, 0, sizeof(in_infos)); + + memset(out_infos, 0, sizeof(out_infos)); + + for(size_t i = 0; i < nchannels; i++) { + in_infos[i].data = malloc(size); + assert(in_infos[i].data); + out_infos[i].data = outdata; + } // Open two new meshlink instance. @@ -127,10 +121,7 @@ int main(int argc, char *argv[]) { meshlink_handle_t *mesh2 = meshlink_open("channels_aio_conf.2", "bar", "channels", DEV_CLASS_BACKBONE); assert(mesh2); - mesh2->priv = indata; - - meshlink_set_log_cb(mesh1, MESHLINK_DEBUG, log_cb); - meshlink_set_log_cb(mesh2, MESHLINK_DEBUG, log_cb); + mesh2->priv = in_infos; meshlink_enable_discovery(mesh1, false); meshlink_enable_discovery(mesh2, false); @@ -154,32 +145,75 @@ int main(int argc, char *argv[]) { meshlink_set_channel_accept_cb(mesh1, reject_cb); meshlink_set_channel_accept_cb(mesh2, accept_cb); - meshlink_set_node_status_cb(mesh1, status_cb); - // Start both instances assert(meshlink_start(mesh1)); assert(meshlink_start(mesh2)); - // Open a channel from foo to bar. + // Open channels from foo to bar. meshlink_node_t *bar = meshlink_get_node(mesh1, "bar"); assert(bar); - meshlink_channel_t *channel = meshlink_channel_open(mesh1, bar, 7, NULL, NULL, 0); - assert(channel); + meshlink_channel_t *channels[nchannels + 1]; + + for(size_t i = 0; i < nchannels + 1; i++) { + channels[i] = meshlink_channel_open(mesh1, bar, i + 1, NULL, NULL, 0); + assert(channels[i]); + } + + // Send a large buffer of data on each channel. - // Send a large buffer of data. + for(size_t i = 0; i < nchannels; i++) { + assert(meshlink_channel_aio_send(mesh1, channels[i], outdata, size / 3, aio_cb, &out_infos[i].aio_infos[0])); + assert(meshlink_channel_aio_send(mesh1, channels[i], outdata + size / 3, size - size / 3, aio_cb, &out_infos[i].aio_infos[1])); + } - assert(meshlink_channel_aio_send(mesh1, channel, outdata, size / 2, foo_aio_cb, NULL)); - assert(meshlink_channel_aio_send(mesh1, channel, outdata + size / 2, size - size / 2, foo_aio_cb, NULL)); + // Send a little bit on the last channel using a regular send - assert(wait_sync_flag(&bar_finished_flag, 10)); + assert(meshlink_channel_send(mesh1, channels[nchannels], outdata, smallsize) == smallsize); - assert(foo_callbacks == 2); - assert(bar_callbacks == 2); - assert(bar_received == size); - assert(!memcmp(indata, outdata, size)); + // Wait for everyone to finish. + + assert(wait_sync_flag(&bar_received_flag, 10)); + + for(size_t i = 0; i < nchannels; i++) { + assert(wait_sync_flag(&out_infos[i].aio_infos[0].flag, 10)); + assert(wait_sync_flag(&out_infos[i].aio_infos[1].flag, 10)); + assert(wait_sync_flag(&in_infos[i].aio_infos[0].flag, 10)); + assert(wait_sync_flag(&in_infos[i].aio_infos[1].flag, 10)); + } + + // Check that everything is correct. + + assert(bar_received_len == smallsize); + + for(size_t i = 0; i < nchannels; i++) { + // Data should be transferred intact. + assert(!memcmp(in_infos[i].data, out_infos[i].data, size)); + + // One callback for each AIO buffer. + assert(out_infos[i].aio_infos[0].callbacks == 1); + assert(out_infos[i].aio_infos[1].callbacks == 1); + assert(in_infos[i].aio_infos[0].callbacks == 1); + assert(in_infos[i].aio_infos[1].callbacks == 1); + + // Correct size sent and received. + assert(out_infos[i].aio_infos[0].size == size / 3); + assert(out_infos[i].aio_infos[1].size == size - size / 3); + assert(in_infos[i].aio_infos[0].size == size / 4); + assert(in_infos[i].aio_infos[1].size == size - size / 4); + + // First batch of data should all be sent and received before the second batch + for(size_t j = 0; j < nchannels; j++) { + assert(timercmp(&out_infos[i].aio_infos[0].tv, &out_infos[j].aio_infos[1].tv, <=)); + assert(timercmp(&in_infos[i].aio_infos[0].tv, &in_infos[j].aio_infos[1].tv, <=)); + } + + // The non-AIO transfer should have completed before everything else + assert(timercmp(&out_infos[i].aio_infos[0].tv, &bar_received_tv, >=)); + assert(timercmp(&in_infos[i].aio_infos[0].tv, &bar_received_tv, >=)); + } // Clean up. -- 2.39.2