]> git.meshlink.io Git - meshlink/blob - test/channels-aio-fd-wouldblock.c
Add AIO send fds to the event loop if they would block.
[meshlink] / test / channels-aio-fd-wouldblock.c
1 #ifdef NDEBUG
2 #undef NDEBUG
3 #endif
4
5 #include <assert.h>
6 #include <stdio.h>
7 #include <unistd.h>
8 #include <stdlib.h>
9 #include <fcntl.h>
10
11 #include "meshlink.h"
12 #include "utils.h"
13
14 static size_t received;
15 static struct sync_flag recv_flag;
16 static struct sync_flag close_flag;
17 static struct sync_flag poll_flag;
18 static struct sync_flag aio_done_flag;
19
20 static void aio_fd_cb(meshlink_handle_t *mesh, meshlink_channel_t *channel, int fd, size_t len, void *priv) {
21         (void)mesh;
22         (void)channel;
23         (void)fd;
24         (void)len;
25         (void)priv;
26
27         set_sync_flag(&aio_done_flag, true);
28 }
29
30 static void aio_fd_cb_ignore(meshlink_handle_t *mesh, meshlink_channel_t *channel, int fd, size_t len, void *priv) {
31         (void)mesh;
32         (void)channel;
33         (void)fd;
34         (void)len;
35         (void)priv;
36 }
37
38 static void receive_cb(meshlink_handle_t *mesh, meshlink_channel_t *channel, const void *data, size_t len) {
39         (void)mesh;
40         (void)channel;
41         (void)len;
42
43         if(!data) {
44                 set_sync_flag(&close_flag, true);
45                 meshlink_channel_close(mesh, channel);
46         }
47
48         received += len;
49         set_sync_flag(&recv_flag, true);
50 }
51
52 static bool accept_cb(meshlink_handle_t *mesh, meshlink_channel_t *channel, uint16_t port, const void *data, size_t len) {
53         (void)port;
54         (void)data;
55         (void)len;
56         meshlink_set_channel_receive_cb(mesh, channel, receive_cb);
57         return true;
58 }
59
60 static void poll_cb(meshlink_handle_t *mesh, meshlink_channel_t *channel, size_t len) {
61         (void)len;
62         meshlink_set_channel_poll_cb(mesh, channel, NULL);
63         set_sync_flag(&poll_flag, true);
64 }
65
66 int main(int argc, char *argv[]) {
67         (void)argc;
68         (void)argv;
69
70         meshlink_set_log_cb(NULL, MESHLINK_WARNING, log_cb);
71
72         // Open two new meshlink instance.
73
74         meshlink_handle_t *mesh_a, *mesh_b;
75         open_meshlink_pair(&mesh_a, &mesh_b, "channels_aio_fd");
76
77         // Set the callbacks.
78
79         meshlink_set_channel_accept_cb(mesh_b, accept_cb);
80
81         // Start both instances
82
83         start_meshlink_pair(mesh_a, mesh_b);
84
85         // Open a channel from a to b.
86
87         meshlink_node_t *b = meshlink_get_node(mesh_a, "b");
88         assert(b);
89
90         meshlink_channel_t *channel = meshlink_channel_open(mesh_a, b, 1, NULL, NULL, 0);
91         assert(channel);
92
93         // Wait for the channel to be fully established
94
95         meshlink_set_channel_poll_cb(mesh_a, channel, poll_cb);
96         assert(wait_sync_flag(&poll_flag, 10));
97
98         // Create a UNIX stream socket
99
100         int fds[2];
101         assert(socketpair(AF_UNIX, SOCK_STREAM, 0, fds) == 0);
102         assert(fcntl(fds[1], F_SETFL, O_NONBLOCK) == 0);
103
104         // Enqueue 3 AIO buffers for the same fd
105
106         assert(meshlink_channel_aio_fd_send(mesh_a, channel, fds[1], 200, aio_fd_cb, NULL));
107         assert(meshlink_channel_aio_fd_send(mesh_a, channel, fds[1], 200, aio_fd_cb_ignore, NULL));
108         assert(meshlink_channel_aio_fd_send(mesh_a, channel, fds[1], 200, aio_fd_cb, NULL));
109
110         // Fill the first buffer with two packets
111
112         char buf[65535] = "";
113
114         sleep(1);
115         assert(write(fds[0], buf, 100) == 100);
116         assert(wait_sync_flag(&recv_flag, 2));
117         assert(received == 100);
118
119         sleep(1);
120         assert(!check_sync_flag(&aio_done_flag));
121         set_sync_flag(&recv_flag, false);
122         assert(write(fds[0], buf, 100) == 100);
123         assert(wait_sync_flag(&recv_flag, 2));
124         assert(received == 200);
125
126         assert(wait_sync_flag(&aio_done_flag, 1));
127         set_sync_flag(&aio_done_flag, false);
128
129         // Fill half of the second buffer
130
131         set_sync_flag(&recv_flag, false);
132         assert(write(fds[0], buf, 100) == 100);
133         assert(wait_sync_flag(&recv_flag, 2));
134         assert(received == 300);
135
136         // Send one packet that spans two AIO buffers
137
138         sleep(1);
139         assert(!check_sync_flag(&aio_done_flag));
140         assert(write(fds[0], buf, 300) == 300);
141         assert(wait_sync_flag(&aio_done_flag, 10));
142
143         // Close the channel and wait for the remaining data
144
145         meshlink_channel_close(mesh_a, channel);
146         assert(wait_sync_flag(&close_flag, 10));
147         assert(received == 600);
148
149         // Create a UDP channel
150
151         channel = meshlink_channel_open_ex(mesh_a, b, 1, NULL, NULL, 0, MESHLINK_CHANNEL_UDP);
152         assert(channel);
153
154         // Wait for the channel to be fully established
155
156         received = 0;
157         set_sync_flag(&poll_flag, false);
158         set_sync_flag(&recv_flag, false);
159         set_sync_flag(&close_flag, false);
160         meshlink_set_channel_poll_cb(mesh_a, channel, poll_cb);
161         assert(wait_sync_flag(&poll_flag, 10));
162
163         // Enqueue a huge AIO buffer
164
165         set_sync_flag(&aio_done_flag, false);
166         assert(meshlink_channel_aio_fd_send(mesh_a, channel, fds[1], -1, aio_fd_cb, NULL));
167
168         // Send a small and a big packets
169
170         assert(write(fds[0], buf, 100) == 100);
171         assert(wait_sync_flag(&recv_flag, 2));
172         assert(received == 100);
173
174         sleep(1);
175         assert(!check_sync_flag(&aio_done_flag));
176         set_sync_flag(&recv_flag, false);
177         assert(write(fds[0], buf, 65535) == 65535);
178         assert(wait_sync_flag(&recv_flag, 2));
179         assert(received == 65635);
180
181         // Close the fds, this should terminate the AIO buffer
182
183         sleep(1);
184         assert(!check_sync_flag(&aio_done_flag));
185         close(fds[0]);
186         assert(wait_sync_flag(&aio_done_flag, 10));
187         close(fds[1]);
188
189         meshlink_channel_close(mesh_a, channel);
190         assert(wait_sync_flag(&close_flag, 10));
191         assert(received == 65635);
192
193         // Clean up.
194
195         close_meshlink_pair(mesh_a, mesh_b);
196 }