#include "meshlink.h"
#include "utils.h"
-static const size_t size = 10000000; // size of data to transfer
+static const size_t size = 12000000; // size of data to transfer
struct aio_info {
int port;
struct aio_info aio_infos[2];
};
-static size_t b_received_len;
-static struct timeval b_received_tv;
static struct sync_flag b_received_flag;
static void aio_cb(meshlink_handle_t *mesh, meshlink_channel_t *channel, const void *data, size_t len, void *priv) {
switch(port) {
case 1:
case 3:
- meshlink_channel_aio_receive(mesh, channel, info->data, size / 4, aio_cb, &info->aio_infos[0]);
- meshlink_channel_aio_receive(mesh, channel, info->data + size / 4, size - size / 4, aio_cb_close, &info->aio_infos[1]);
+ assert(meshlink_channel_aio_receive(mesh, channel, info->data, size / 4, aio_cb, &info->aio_infos[0]));
+ assert(meshlink_channel_aio_receive(mesh, channel, info->data + size / 4, size - size / 4, aio_cb_close, &info->aio_infos[1]));
break;
case 2:
case 4:
- meshlink_channel_aio_receive(mesh, channel, info->data, size / 4, aio_cb_close, &info->aio_infos[0]);
- meshlink_channel_aio_receive(mesh, channel, info->data + size / 4, size - size / 4, aio_cb, &info->aio_infos[1]);
- set_sync_flag(&info->aio_infos[1].flag, true);
+ assert(meshlink_channel_aio_receive(mesh, channel, info->data, size / 4, aio_cb_close, &info->aio_infos[0]));
+ assert(meshlink_channel_aio_receive(mesh, channel, info->data + size / 4, size - size / 4, aio_cb, &info->aio_infos[1]));
break;
default:
return true;
}
-int main(int argc, char *argv[]) {
- (void)argc;
- (void)argv;
+int main(void) {
+ init_sync_flag(&b_received_flag);
meshlink_set_log_cb(NULL, MESHLINK_WARNING, log_cb);
memset(out_infos, 0, sizeof(out_infos));
for(size_t i = 0; i < nchannels; i++) {
+ init_sync_flag(&in_infos[i].aio_infos[0].flag);
+ init_sync_flag(&in_infos[i].aio_infos[1].flag);
+ init_sync_flag(&out_infos[i].aio_infos[0].flag);
+ init_sync_flag(&out_infos[i].aio_infos[1].flag);
+
in_infos[i].data = malloc(size);
assert(in_infos[i].data);
out_infos[i].data = outdata;
// Open two new meshlink instance.
meshlink_handle_t *mesh_a, *mesh_b;
- open_meshlink_pair(&mesh_a, &mesh_b, "channels_aio");
+ open_meshlink_pair(&mesh_a, &mesh_b, "channels_aio_cornercases");
// Set the callbacks.
if(i < 2) {
assert(meshlink_channel_aio_send(mesh_a, channels[i], outdata, size / 3, aio_cb, &out_infos[i].aio_infos[0]));
assert(meshlink_channel_aio_send(mesh_a, channels[i], outdata + size / 3, size - size / 3, aio_cb_close, &out_infos[i].aio_infos[1]));
- assert(wait_sync_flag(&out_infos[i].aio_infos[0].flag, 10));
- assert(wait_sync_flag(&out_infos[i].aio_infos[1].flag, 10));
} else {
assert(meshlink_channel_aio_send(mesh_a, channels[i], outdata, size / 3, aio_cb_close, &out_infos[i].aio_infos[0]));
assert(meshlink_channel_aio_send(mesh_a, channels[i], outdata + size / 3, size - size / 3, aio_cb, &out_infos[i].aio_infos[1]));
- assert(wait_sync_flag(&out_infos[i].aio_infos[0].flag, 10));
- set_sync_flag(&out_infos[i].aio_infos[1].flag, true);
}
}
// Wait for all AIO buffers to finish.
for(size_t i = 0; i < nchannels; i++) {
+ // The first chunk should always have succeeded
assert(wait_sync_flag(&in_infos[i].aio_infos[0].flag, 10));
- assert(wait_sync_flag(&in_infos[i].aio_infos[1].flag, 10));
assert(wait_sync_flag(&out_infos[i].aio_infos[0].flag, 10));
- assert(wait_sync_flag(&out_infos[i].aio_infos[1].flag, 10));
+
+ // The second chunk should only have completed if we didn't close the channel yet
+ if(i % 2) {
+ assert(!check_sync_flag(&in_infos[i].aio_infos[1].flag));
+ } else {
+ assert(wait_sync_flag(&in_infos[i].aio_infos[1].flag, 10));
+ }
+
+ if(i < 2) {
+ assert(wait_sync_flag(&out_infos[i].aio_infos[1].flag, 10));
+ } else {
+ assert(!check_sync_flag(&out_infos[i].aio_infos[1].flag));
+ }
+
}
// Check that everything is correct.