]> git.meshlink.io Git - meshlink/blob - test/echo-fork.c
Fix the channels-no-partial test case.
[meshlink] / test / echo-fork.c
1 #define _GNU_SOURCE
2 #include <stdio.h>
3 #include <unistd.h>
4 #include <stdlib.h>
5 #include <string.h>
6
7 #include "../src/meshlink.h"
8
9 /*
10  * To run this test case, direct a large file to strd
11  */
12
13 volatile bool bar_reachable = false;
14 volatile bool bar_responded = false;
15 volatile bool foo_closed = false;
16 int debug_level;
17
18 void log_cb(meshlink_handle_t *mesh, meshlink_log_level_t level, const char *text) {
19         if(mesh) {
20                 fprintf(stderr, "(%s) ", mesh->name);
21         }
22
23         fprintf(stderr, "[%d] %s\n", level, text);
24 }
25
26 void status_cb(meshlink_handle_t *mesh, meshlink_node_t *node, bool reachable) {
27         (void)mesh;
28
29         if(!strcmp(node->name, "bar")) {
30                 bar_reachable = reachable;
31         } else if(!strcmp(node->name, "foo"))
32                 if(!reachable) {
33                         foo_closed = true;
34                 }
35 }
36
37 void foo_receive_cb(meshlink_handle_t *mesh, meshlink_channel_t *channel, const void *data, size_t len) {
38         (void)mesh;
39         (void)channel;
40         (void)data;
41         (void)len;
42
43         // One way only.
44 }
45
46 void bar_receive_cb(meshlink_handle_t *mesh, meshlink_channel_t *channel, const void *data, size_t len) {
47         (void)mesh;
48         (void)channel;
49
50         if(!len) {
51                 fprintf(stderr, "Connection closed by foo\n");
52                 foo_closed = true;
53                 return;
54         }
55
56         int ret_val;
57         (void)ret_val;
58         // Write data to stdout.
59         ret_val = write(1, data, len);
60 }
61
62 bool reject_cb(meshlink_handle_t *mesh, meshlink_channel_t *channel, uint16_t port, const void *data, size_t len) {
63         (void)mesh;
64         (void)channel;
65         (void)port;
66         (void)data;
67         (void)len;
68
69         return false;
70 }
71
72 bool accept_cb(meshlink_handle_t *mesh, meshlink_channel_t *channel, uint16_t port, const void *data, size_t len) {
73         if(port != 7) {
74                 return false;
75         }
76
77         meshlink_set_channel_receive_cb(mesh, channel, bar_receive_cb);
78
79         if(data) {
80                 bar_receive_cb(mesh, channel, data, len);
81         }
82
83         return true;
84 }
85
86 void poll_cb(meshlink_handle_t *mesh, meshlink_channel_t *channel, size_t len) {
87         (void)len;
88
89         meshlink_set_channel_poll_cb(mesh, channel, NULL);
90         bar_responded = true;
91 }
92
93 int main1(void) {
94         close(1);
95
96         meshlink_set_log_cb(NULL, debug_level, log_cb);
97
98         meshlink_handle_t *mesh1 = meshlink_open("echo-fork_conf.1", "foo", "echo-fork", DEV_CLASS_BACKBONE);
99
100         if(!mesh1) {
101                 fprintf(stderr, "Could not initialize configuration for foo\n");
102                 return 1;
103         }
104
105         meshlink_set_log_cb(mesh1, debug_level, log_cb);
106         meshlink_set_channel_accept_cb(mesh1, reject_cb);
107         meshlink_set_node_status_cb(mesh1, status_cb);
108
109         if(!meshlink_start(mesh1)) {
110                 fprintf(stderr, "Foo could not start\n");
111                 return 1;
112         }
113
114         for(int i = 0; i < 20; i++) {
115                 sleep(1);
116
117                 if(bar_reachable) {
118                         break;
119                 }
120         }
121
122         if(!bar_reachable) {
123                 fprintf(stderr, "Bar not reachable for foo after 20 seconds\n");
124                 return 1;
125         }
126
127         // Open a channel from foo to bar.
128
129         meshlink_node_t *bar = meshlink_get_node(mesh1, "bar");
130
131         if(!bar) {
132                 fprintf(stderr, "Foo could not find bar\n");
133                 return 1;
134         }
135
136         meshlink_channel_t *channel = meshlink_channel_open(mesh1, bar, 7, foo_receive_cb, NULL, 0);
137         meshlink_set_channel_poll_cb(mesh1, channel, poll_cb);
138
139         // read and buffer stdin
140         int BUF_SIZE = 1024 * 1024;
141         char buffer[BUF_SIZE];
142
143         for(int i = 0; i < 5; i++) {
144                 sleep(1);
145
146                 if(bar_responded) {
147                         break;
148                 }
149         }
150
151         if(!bar_responded) {
152                 fprintf(stderr, "Bar did not respond to foo's channel message\n");
153                 return 1;
154         }
155
156         do {
157                 //fprintf(stderr, ":");
158                 ssize_t len = read(0, buffer, BUF_SIZE);
159
160                 if(len <= 0) {
161                         break;
162                 }
163
164                 char *p = buffer;
165
166                 while(len > 0) {
167                         ssize_t sent = meshlink_channel_send(mesh1, channel, p, len);
168
169                         if(sent < 0) {
170                                 fprintf(stderr, "Sending message failed\n");
171                                 return 1;
172                         }
173
174                         if(!sent) {
175                                 usleep(100000);
176                         } else {
177                                 len -= sent;
178                                 p += sent;
179                         }
180                 }
181         } while(true);
182
183         fprintf(stderr, "Foo finished sending\n");
184
185         meshlink_channel_close(mesh1, channel);
186         sleep(1);
187
188         // Clean up.
189
190         meshlink_close(mesh1);
191
192         return 0;
193 }
194
195
196 int main2(void) {
197         close(0);
198
199         sleep(1);
200
201         meshlink_set_log_cb(NULL, debug_level, log_cb);
202
203         meshlink_handle_t *mesh2 = meshlink_open("echo-fork_conf.2", "bar", "echo-fork", DEV_CLASS_BACKBONE);
204
205         if(!mesh2) {
206                 fprintf(stderr, "Could not initialize configuration for bar\n");
207                 return 1;
208         }
209
210         meshlink_set_log_cb(mesh2, debug_level, log_cb);
211         meshlink_set_channel_accept_cb(mesh2, accept_cb);
212         meshlink_set_node_status_cb(mesh2, status_cb);
213
214         if(!meshlink_start(mesh2)) {
215                 fprintf(stderr, "Bar could not start\n");
216                 return 1;
217         }
218
219         while(!foo_closed) {
220                 sleep(1);
221         }
222
223         // Clean up.
224
225         meshlink_close(mesh2);
226
227         return 0;
228 }
229
230
231 int main() {
232         debug_level = getenv("DEBUG") ? MESHLINK_DEBUG : MESHLINK_ERROR;
233
234         // Initialize and exchange configuration.
235
236         meshlink_handle_t *foo = meshlink_open("echo-fork_conf.1", "foo", "echo-fork", DEV_CLASS_BACKBONE);
237         meshlink_handle_t *bar = meshlink_open("echo-fork_conf.2", "bar", "echo-fork", DEV_CLASS_BACKBONE);
238         meshlink_add_address(foo, "localhost");
239         meshlink_import(bar, meshlink_export(foo));
240         meshlink_import(foo, meshlink_export(bar));
241         meshlink_close(foo);
242         meshlink_close(bar);
243
244         if(fork()) {
245                 return main1();
246         } else {
247                 return main2();
248         }
249 }