14 static size_t received;
15 static struct sync_flag recv_flag;
16 static struct sync_flag close_flag;
17 static struct sync_flag poll_flag;
18 static struct sync_flag aio_done_flag;
20 static void aio_fd_cb(meshlink_handle_t *mesh, meshlink_channel_t *channel, int fd, size_t len, void *priv) {
27 set_sync_flag(&aio_done_flag, true);
30 static void aio_fd_cb_ignore(meshlink_handle_t *mesh, meshlink_channel_t *channel, int fd, size_t len, void *priv) {
38 static void receive_cb(meshlink_handle_t *mesh, meshlink_channel_t *channel, const void *data, size_t len) {
44 set_sync_flag(&close_flag, true);
45 meshlink_channel_close(mesh, channel);
49 set_sync_flag(&recv_flag, true);
52 static bool accept_cb(meshlink_handle_t *mesh, meshlink_channel_t *channel, uint16_t port, const void *data, size_t len) {
56 meshlink_set_channel_receive_cb(mesh, channel, receive_cb);
60 static void poll_cb(meshlink_handle_t *mesh, meshlink_channel_t *channel, size_t len) {
62 meshlink_set_channel_poll_cb(mesh, channel, NULL);
63 set_sync_flag(&poll_flag, true);
66 int main(int argc, char *argv[]) {
70 meshlink_set_log_cb(NULL, MESHLINK_WARNING, log_cb);
72 // Open two new meshlink instance.
74 meshlink_handle_t *mesh_a, *mesh_b;
75 open_meshlink_pair(&mesh_a, &mesh_b, "channels_aio_fd");
79 meshlink_set_channel_accept_cb(mesh_b, accept_cb);
81 // Start both instances
83 start_meshlink_pair(mesh_a, mesh_b);
85 // Open a channel from a to b.
87 meshlink_node_t *b = meshlink_get_node(mesh_a, "b");
90 meshlink_channel_t *channel = meshlink_channel_open(mesh_a, b, 1, NULL, NULL, 0);
93 // Wait for the channel to be fully established
95 meshlink_set_channel_poll_cb(mesh_a, channel, poll_cb);
96 assert(wait_sync_flag(&poll_flag, 10));
98 // Create a UNIX stream socket
101 assert(socketpair(AF_UNIX, SOCK_STREAM, 0, fds) == 0);
102 assert(fcntl(fds[1], F_SETFL, O_NONBLOCK) == 0);
104 // Enqueue 3 AIO buffers for the same fd
106 assert(meshlink_channel_aio_fd_send(mesh_a, channel, fds[1], 200, aio_fd_cb, NULL));
107 assert(meshlink_channel_aio_fd_send(mesh_a, channel, fds[1], 200, aio_fd_cb_ignore, NULL));
108 assert(meshlink_channel_aio_fd_send(mesh_a, channel, fds[1], 200, aio_fd_cb, NULL));
110 // Fill the first buffer with two packets
112 char buf[65535] = "";
115 assert(write(fds[0], buf, 100) == 100);
116 assert(wait_sync_flag(&recv_flag, 2));
117 assert(received == 100);
120 assert(!check_sync_flag(&aio_done_flag));
121 set_sync_flag(&recv_flag, false);
122 assert(write(fds[0], buf, 100) == 100);
123 assert(wait_sync_flag(&recv_flag, 2));
124 assert(received == 200);
126 assert(wait_sync_flag(&aio_done_flag, 1));
127 set_sync_flag(&aio_done_flag, false);
129 // Fill half of the second buffer
131 set_sync_flag(&recv_flag, false);
132 assert(write(fds[0], buf, 100) == 100);
133 assert(wait_sync_flag(&recv_flag, 2));
134 assert(received == 300);
136 // Send one packet that spans two AIO buffers
139 assert(!check_sync_flag(&aio_done_flag));
140 assert(write(fds[0], buf, 300) == 300);
141 assert(wait_sync_flag(&aio_done_flag, 10));
143 // Close the channel and wait for the remaining data
145 meshlink_channel_close(mesh_a, channel);
146 assert(wait_sync_flag(&close_flag, 10));
147 assert(received == 600);
149 // Create a UDP channel
151 channel = meshlink_channel_open_ex(mesh_a, b, 1, NULL, NULL, 0, MESHLINK_CHANNEL_UDP);
154 // Wait for the channel to be fully established
157 set_sync_flag(&poll_flag, false);
158 set_sync_flag(&recv_flag, false);
159 set_sync_flag(&close_flag, false);
160 meshlink_set_channel_poll_cb(mesh_a, channel, poll_cb);
161 assert(wait_sync_flag(&poll_flag, 10));
163 // Enqueue a huge AIO buffer
165 set_sync_flag(&aio_done_flag, false);
166 assert(meshlink_channel_aio_fd_send(mesh_a, channel, fds[1], -1, aio_fd_cb, NULL));
168 // Send a small and a big packets
170 assert(write(fds[0], buf, 100) == 100);
171 assert(wait_sync_flag(&recv_flag, 2));
172 assert(received == 100);
175 assert(!check_sync_flag(&aio_done_flag));
176 set_sync_flag(&recv_flag, false);
177 assert(write(fds[0], buf, 65535) == 65535);
178 assert(wait_sync_flag(&recv_flag, 2));
179 assert(received == 65635);
181 // Close the fds, this should terminate the AIO buffer
184 assert(!check_sync_flag(&aio_done_flag));
186 assert(wait_sync_flag(&aio_done_flag, 10));
189 meshlink_channel_close(mesh_a, channel);
190 assert(wait_sync_flag(&close_flag, 10));
191 assert(received == 65635);
195 close_meshlink_pair(mesh_a, mesh_b);