]> git.meshlink.io Git - meshlink/blob - test/echo-fork.c
Add duplicate node detection callback.
[meshlink] / test / echo-fork.c
1 #define _GNU_SOURCE
2 #include <stdio.h>
3 #include <unistd.h>
4 #include <stdlib.h>
5 #include <string.h>
6
7 #include "../src/meshlink.h"
8
9 /*
10  * To run this test case, direct a large file to strd
11  */
12
13 volatile bool bar_reachable = false;
14 volatile bool bar_responded = false;
15 volatile bool foo_closed = false;
16 int debug_level;
17
18 void log_cb(meshlink_handle_t *mesh, meshlink_log_level_t level, const char *text) {
19         if(mesh) {
20                 fprintf(stderr, "(%s) ", mesh->name);
21         }
22
23         fprintf(stderr, "[%d] %s\n", level, text);
24 }
25
26 void status_cb(meshlink_handle_t *mesh, meshlink_node_t *node, bool reachable) {
27         (void)mesh;
28
29         if(!strcmp(node->name, "bar")) {
30                 bar_reachable = reachable;
31         } else if(!strcmp(node->name, "foo"))
32                 if(!reachable) {
33                         foo_closed = true;
34                 }
35 }
36
37 void foo_receive_cb(meshlink_handle_t *mesh, meshlink_channel_t *channel, const void *data, size_t len) {
38         (void)mesh;
39         (void)channel;
40         (void)data;
41         (void)len;
42
43         // One way only.
44 }
45
46 void bar_receive_cb(meshlink_handle_t *mesh, meshlink_channel_t *channel, const void *data, size_t len) {
47         (void)mesh;
48         (void)channel;
49
50         if(!len) {
51                 fprintf(stderr, "Connection closed by foo\n");
52                 foo_closed = true;
53                 return;
54         }
55
56         // Write data to stdout.
57         write(1, data, len);
58 }
59
60 bool reject_cb(meshlink_handle_t *mesh, meshlink_channel_t *channel, uint16_t port, const void *data, size_t len) {
61         (void)mesh;
62         (void)channel;
63         (void)port;
64         (void)data;
65         (void)len;
66
67         return false;
68 }
69
70 bool accept_cb(meshlink_handle_t *mesh, meshlink_channel_t *channel, uint16_t port, const void *data, size_t len) {
71         if(port != 7) {
72                 return false;
73         }
74
75         meshlink_set_channel_receive_cb(mesh, channel, bar_receive_cb);
76
77         if(data) {
78                 bar_receive_cb(mesh, channel, data, len);
79         }
80
81         return true;
82 }
83
84 void poll_cb(meshlink_handle_t *mesh, meshlink_channel_t *channel, size_t len) {
85         (void)len;
86
87         meshlink_set_channel_poll_cb(mesh, channel, NULL);
88         bar_responded = true;
89 }
90
91 int main1(void) {
92         close(1);
93
94         meshlink_set_log_cb(NULL, debug_level, log_cb);
95
96         meshlink_handle_t *mesh1 = meshlink_open("echo-fork_conf.1", "foo", "echo-fork", DEV_CLASS_BACKBONE);
97
98         if(!mesh1) {
99                 fprintf(stderr, "Could not initialize configuration for foo\n");
100                 return 1;
101         }
102
103         meshlink_set_log_cb(mesh1, debug_level, log_cb);
104         meshlink_set_channel_accept_cb(mesh1, reject_cb);
105         meshlink_set_node_status_cb(mesh1, status_cb);
106
107         if(!meshlink_start(mesh1)) {
108                 fprintf(stderr, "Foo could not start\n");
109                 return 1;
110         }
111
112         for(int i = 0; i < 20; i++) {
113                 sleep(1);
114
115                 if(bar_reachable) {
116                         break;
117                 }
118         }
119
120         if(!bar_reachable) {
121                 fprintf(stderr, "Bar not reachable for foo after 20 seconds\n");
122                 return 1;
123         }
124
125         // Open a channel from foo to bar.
126
127         meshlink_node_t *bar = meshlink_get_node(mesh1, "bar");
128
129         if(!bar) {
130                 fprintf(stderr, "Foo could not find bar\n");
131                 return 1;
132         }
133
134         meshlink_channel_t *channel = meshlink_channel_open(mesh1, bar, 7, foo_receive_cb, NULL, 0);
135         meshlink_set_channel_poll_cb(mesh1, channel, poll_cb);
136
137         // read and buffer stdin
138         int BUF_SIZE = 1024 * 1024;
139         char buffer[BUF_SIZE];
140
141         for(int i = 0; i < 5; i++) {
142                 sleep(1);
143
144                 if(bar_responded) {
145                         break;
146                 }
147         }
148
149         if(!bar_responded) {
150                 fprintf(stderr, "Bar did not respond to foo's channel message\n");
151                 return 1;
152         }
153
154         do {
155                 //fprintf(stderr, ":");
156                 ssize_t len = read(0, buffer, BUF_SIZE);
157
158                 if(len <= 0) {
159                         break;
160                 }
161
162                 char *p = buffer;
163
164                 while(len > 0) {
165                         ssize_t sent = meshlink_channel_send(mesh1, channel, p, len);
166
167                         if(sent < 0) {
168                                 fprintf(stderr, "Sending message failed\n");
169                                 return 1;
170                         }
171
172                         if(!sent) {
173                                 usleep(100000);
174                         } else {
175                                 len -= sent;
176                                 p += sent;
177                         }
178                 }
179         } while(true);
180
181         fprintf(stderr, "Foo finished sending\n");
182
183         meshlink_channel_close(mesh1, channel);
184         sleep(1);
185
186         // Clean up.
187
188         meshlink_close(mesh1);
189
190         return 0;
191 }
192
193
194 int main2(void) {
195         close(0);
196
197         sleep(1);
198
199         meshlink_set_log_cb(NULL, debug_level, log_cb);
200
201         meshlink_handle_t *mesh2 = meshlink_open("echo-fork_conf.2", "bar", "echo-fork", DEV_CLASS_BACKBONE);
202
203         if(!mesh2) {
204                 fprintf(stderr, "Could not initialize configuration for bar\n");
205                 return 1;
206         }
207
208         meshlink_set_log_cb(mesh2, debug_level, log_cb);
209         meshlink_set_channel_accept_cb(mesh2, accept_cb);
210         meshlink_set_node_status_cb(mesh2, status_cb);
211
212         if(!meshlink_start(mesh2)) {
213                 fprintf(stderr, "Bar could not start\n");
214                 return 1;
215         }
216
217         while(!foo_closed) {
218                 sleep(1);
219         }
220
221         // Clean up.
222
223         meshlink_close(mesh2);
224
225         return 0;
226 }
227
228
229 int main() {
230         debug_level = getenv("DEBUG") ? MESHLINK_DEBUG : MESHLINK_ERROR;
231
232         // Initialize and exchange configuration.
233
234         meshlink_handle_t *foo = meshlink_open("echo-fork_conf.1", "foo", "echo-fork", DEV_CLASS_BACKBONE);
235         meshlink_handle_t *bar = meshlink_open("echo-fork_conf.2", "bar", "echo-fork", DEV_CLASS_BACKBONE);
236         meshlink_add_address(foo, "localhost");
237         meshlink_import(bar, meshlink_export(foo));
238         meshlink_import(foo, meshlink_export(bar));
239         meshlink_close(foo);
240         meshlink_close(bar);
241
242         if(fork()) {
243                 return main1();
244         } else {
245                 return main2();
246         }
247 }