]> git.meshlink.io Git - meshlink/blob - test/channels-aio-cornercases.c
Add a test for AIO callback cornercases.
[meshlink] / test / channels-aio-cornercases.c
1 #ifdef NDEBUG
2 #undef NDEBUG
3 #endif
4
5 #include <assert.h>
6 #include <stdio.h>
7 #include <unistd.h>
8 #include <stdlib.h>
9 #include <string.h>
10 #include <sys/time.h>
11
12 #include "meshlink.h"
13 #include "utils.h"
14
15 static const size_t size = 10000000; // size of data to transfer
16
17 struct aio_info {
18         int port;
19         int callbacks;
20         size_t size;
21         struct timeval tv;
22         struct sync_flag flag;
23 };
24
25 struct channel_info {
26         char *data;
27         struct aio_info aio_infos[2];
28 };
29
30 static size_t b_received_len;
31 static struct timeval b_received_tv;
32 static struct sync_flag b_received_flag;
33
34 static void aio_cb(meshlink_handle_t *mesh, meshlink_channel_t *channel, const void *data, size_t len, void *priv) {
35         (void)mesh;
36         (void)channel;
37         (void)data;
38         (void)len;
39
40         struct aio_info *info = priv;
41
42         fprintf(stderr, "%d:%s aio_cb %s %p %zu\n", info->port, mesh->name, channel->node->name, data, len);
43
44         gettimeofday(&info->tv, NULL);
45         info->callbacks++;
46         info->size += len;
47         set_sync_flag(&info->flag, true);
48 }
49
50 static void aio_cb_close(meshlink_handle_t *mesh, meshlink_channel_t *channel, const void *data, size_t len, void *priv) {
51         aio_cb(mesh, channel, data, len, priv);
52         struct aio_info *info = priv;
53         fprintf(stderr, "%d:%s aio_cb %s closing\n", info->port, mesh->name, channel->node->name);
54         meshlink_channel_close(mesh, channel);
55 }
56
57 static bool accept_cb(meshlink_handle_t *mesh, meshlink_channel_t *channel, uint16_t port, const void *data, size_t len) {
58         assert(!data);
59         assert(!len);
60
61         fprintf(stderr, "%d:%s accept_cb %s\n", port, mesh->name, channel->node->name);
62
63         struct channel_info *infos = mesh->priv;
64         struct channel_info *info = &infos[port - 1];
65
66         switch(port) {
67         case 1:
68         case 3:
69                 meshlink_channel_aio_receive(mesh, channel, info->data, size / 4, aio_cb, &info->aio_infos[0]);
70                 meshlink_channel_aio_receive(mesh, channel, info->data + size / 4, size - size / 4, aio_cb_close, &info->aio_infos[1]);
71                 break;
72
73         case 2:
74         case 4:
75                 meshlink_channel_aio_receive(mesh, channel, info->data, size / 4, aio_cb_close, &info->aio_infos[0]);
76                 meshlink_channel_aio_receive(mesh, channel, info->data + size / 4, size - size / 4, aio_cb, &info->aio_infos[1]);
77                 set_sync_flag(&info->aio_infos[1].flag, true);
78                 break;
79
80         default:
81                 return false;
82         }
83
84         return true;
85 }
86
87 int main(int argc, char *argv[]) {
88         (void)argc;
89         (void)argv;
90
91         meshlink_set_log_cb(NULL, MESHLINK_WARNING, log_cb);
92
93         // Prepare data buffers
94
95         char *outdata = malloc(size);
96         assert(outdata);
97
98         for(size_t i = 0; i < size; i++) {
99                 outdata[i] = i;
100         }
101
102         static const size_t nchannels = 4;
103         struct channel_info in_infos[nchannels];
104         struct channel_info out_infos[nchannels];
105
106         memset(in_infos, 0, sizeof(in_infos));
107         memset(out_infos, 0, sizeof(out_infos));
108
109         for(size_t i = 0; i < nchannels; i++) {
110                 in_infos[i].data = malloc(size);
111                 assert(in_infos[i].data);
112                 out_infos[i].data = outdata;
113
114                 out_infos[i].aio_infos[0].port = i + 1;
115                 out_infos[i].aio_infos[1].port = i + 1;
116                 in_infos[i].aio_infos[0].port = i + 1;
117                 in_infos[i].aio_infos[1].port = i + 1;
118         }
119
120         // Open two new meshlink instance.
121
122         meshlink_handle_t *mesh_a, *mesh_b;
123         open_meshlink_pair(&mesh_a, &mesh_b, "channels_aio");
124
125         // Set the callbacks.
126
127         mesh_b->priv = in_infos;
128
129         meshlink_set_channel_accept_cb(mesh_b, accept_cb);
130
131         // Start both instances
132
133         start_meshlink_pair(mesh_a, mesh_b);
134         sleep(1);
135
136         // Open channels from a to b.
137
138         meshlink_node_t *b = meshlink_get_node(mesh_a, "b");
139         assert(b);
140
141         meshlink_channel_t *channels[nchannels + 1];
142
143         // Send a large buffer of data on each channel.
144
145         for(size_t i = 0; i < nchannels; i++) {
146                 channels[i] = meshlink_channel_open(mesh_a, b, i + 1, NULL, NULL, 0);
147                 assert(channels[i]);
148
149                 if(i < 2) {
150                         assert(meshlink_channel_aio_send(mesh_a, channels[i], outdata, size / 3, aio_cb, &out_infos[i].aio_infos[0]));
151                         assert(meshlink_channel_aio_send(mesh_a, channels[i], outdata + size / 3, size - size / 3, aio_cb_close, &out_infos[i].aio_infos[1]));
152                         assert(wait_sync_flag(&out_infos[i].aio_infos[0].flag, 10));
153                         assert(wait_sync_flag(&out_infos[i].aio_infos[1].flag, 10));
154                 } else {
155                         assert(meshlink_channel_aio_send(mesh_a, channels[i], outdata, size / 3, aio_cb_close, &out_infos[i].aio_infos[0]));
156                         assert(meshlink_channel_aio_send(mesh_a, channels[i], outdata + size / 3, size - size / 3, aio_cb, &out_infos[i].aio_infos[1]));
157                         assert(wait_sync_flag(&out_infos[i].aio_infos[0].flag, 10));
158                         set_sync_flag(&out_infos[i].aio_infos[1].flag, true);
159                 }
160         }
161
162         // Wait for all AIO buffers to finish.
163
164         for(size_t i = 0; i < nchannels; i++) {
165                 assert(wait_sync_flag(&in_infos[i].aio_infos[0].flag, 10));
166                 assert(wait_sync_flag(&in_infos[i].aio_infos[1].flag, 10));
167                 assert(wait_sync_flag(&out_infos[i].aio_infos[0].flag, 10));
168                 assert(wait_sync_flag(&out_infos[i].aio_infos[1].flag, 10));
169         }
170
171         // Check that everything is correct.
172
173         assert(!memcmp(in_infos[0].data, out_infos[0].data, size));
174         assert(!memcmp(in_infos[1].data, out_infos[1].data, size / 4));
175         assert(memcmp(in_infos[1].data, out_infos[1].data + size / 4, size - size / 4));
176         assert(!memcmp(in_infos[2].data, out_infos[2].data, size / 3));
177         assert(memcmp(in_infos[2].data, out_infos[2].data + size / 3, size - size / 3));
178         assert(!memcmp(in_infos[3].data, out_infos[3].data, size / 4));
179         assert(memcmp(in_infos[3].data, out_infos[3].data + size / 4, size / 3 - size / 4));
180         assert(memcmp(in_infos[3].data, out_infos[3].data + size / 3, size - size / 3));
181
182         // Clean up.
183
184         close_meshlink_pair(mesh_a, mesh_b);
185
186         free(outdata);
187
188         for(size_t i = 0; i < nchannels; i++) {
189                 free(in_infos[i].data);
190         }
191 }