]> git.meshlink.io Git - meshlink/blob - test/echo-fork.c
f5aa04e3134d63b2eb826048cfbe649e771eec30
[meshlink] / test / echo-fork.c
1 #define _GNU_SOURCE
2 #include <stdio.h>
3 #include <unistd.h>
4 #include <stdlib.h>
5 #include <string.h>
6
7 #include "../src/meshlink.h"
8
9 /*
10  * To run this test case, direct a large file to strd
11  */
12
13 volatile bool bar_reachable = false;
14 volatile bool bar_responded = false;
15 volatile bool foo_closed = false;
16 int debug_level;
17
18 void log_cb(meshlink_handle_t *mesh, meshlink_log_level_t level, const char *text) {
19         if(mesh) {
20                 fprintf(stderr, "(%s) ", mesh->name);
21         }
22
23         fprintf(stderr, "[%d] %s\n", level, text);
24 }
25
26 void status_cb(meshlink_handle_t *mesh, meshlink_node_t *node, bool reachable) {
27         if(!strcmp(node->name, "bar")) {
28                 bar_reachable = reachable;
29         } else if(!strcmp(node->name, "foo"))
30                 if(!reachable) {
31                         foo_closed = true;
32                 }
33 }
34
35 void foo_receive_cb(meshlink_handle_t *mesh, meshlink_channel_t *channel, const void *data, size_t len) {
36         // One way only.
37 }
38
39 void bar_receive_cb(meshlink_handle_t *mesh, meshlink_channel_t *channel, const void *data, size_t len) {
40         if(!len) {
41                 fprintf(stderr, "Connection closed by foo\n");
42                 foo_closed = true;
43                 return;
44         }
45
46         // Write data to stdout.
47         write(1, data, len);
48 }
49
50 bool reject_cb(meshlink_handle_t *mesh, meshlink_channel_t *channel, uint16_t port, const void *data, size_t len) {
51         return false;
52 }
53
54 bool accept_cb(meshlink_handle_t *mesh, meshlink_channel_t *channel, uint16_t port, const void *data, size_t len) {
55         if(port != 7) {
56                 return false;
57         }
58
59         meshlink_set_channel_receive_cb(mesh, channel, bar_receive_cb);
60
61         if(data) {
62                 bar_receive_cb(mesh, channel, data, len);
63         }
64
65         return true;
66 }
67
68 void poll_cb(meshlink_handle_t *mesh, meshlink_channel_t *channel, size_t len) {
69         meshlink_set_channel_poll_cb(mesh, channel, NULL);
70         bar_responded = true;
71 }
72
73 int main1(void) {
74         close(1);
75
76         meshlink_set_log_cb(NULL, debug_level, log_cb);
77
78         meshlink_handle_t *mesh1 = meshlink_open("echo-fork_conf.1", "foo", "echo-fork", DEV_CLASS_BACKBONE);
79
80         if(!mesh1) {
81                 fprintf(stderr, "Could not initialize configuration for foo\n");
82                 return 1;
83         }
84
85         meshlink_set_log_cb(mesh1, debug_level, log_cb);
86         meshlink_set_channel_accept_cb(mesh1, reject_cb);
87         meshlink_set_node_status_cb(mesh1, status_cb);
88
89         if(!meshlink_start(mesh1)) {
90                 fprintf(stderr, "Foo could not start\n");
91                 return 1;
92         }
93
94         for(int i = 0; i < 20; i++) {
95                 sleep(1);
96
97                 if(bar_reachable) {
98                         break;
99                 }
100         }
101
102         if(!bar_reachable) {
103                 fprintf(stderr, "Bar not reachable for foo after 20 seconds\n");
104                 return 1;
105         }
106
107         // Open a channel from foo to bar.
108
109         meshlink_node_t *bar = meshlink_get_node(mesh1, "bar");
110
111         if(!bar) {
112                 fprintf(stderr, "Foo could not find bar\n");
113                 return 1;
114         }
115
116         meshlink_channel_t *channel = meshlink_channel_open(mesh1, bar, 7, foo_receive_cb, NULL, 0);
117         meshlink_set_channel_poll_cb(mesh1, channel, poll_cb);
118
119         // read and buffer stdin
120         int BUF_SIZE = 1024 * 1024;
121         char buffer[BUF_SIZE];
122
123         for(int i = 0; i < 5; i++) {
124                 sleep(1);
125
126                 if(bar_responded) {
127                         break;
128                 }
129         }
130
131         if(!bar_responded) {
132                 fprintf(stderr, "Bar did not respond to foo's channel message\n");
133                 return 1;
134         }
135
136         do {
137                 //fprintf(stderr, ":");
138                 ssize_t len = read(0, buffer, BUF_SIZE);
139
140                 if(len <= 0) {
141                         break;
142                 }
143
144                 char *p = buffer;
145
146                 while(len > 0) {
147                         ssize_t sent = meshlink_channel_send(mesh1, channel, p, len);
148
149                         if(sent < 0) {
150                                 fprintf(stderr, "Sending message failed\n");
151                                 return 1;
152                         }
153
154                         if(!sent) {
155                                 usleep(100000);
156                         } else {
157                                 len -= sent;
158                                 p += sent;
159                         }
160                 }
161         } while(true);
162
163         fprintf(stderr, "Foo finished sending\n");
164
165         meshlink_channel_close(mesh1, channel);
166         sleep(1);
167
168         // Clean up.
169
170         meshlink_close(mesh1);
171
172         return 0;
173 }
174
175
176 int main2(void) {
177         close(0);
178
179         sleep(1);
180
181         meshlink_set_log_cb(NULL, debug_level, log_cb);
182
183         meshlink_handle_t *mesh2 = meshlink_open("echo-fork_conf.2", "bar", "echo-fork", DEV_CLASS_BACKBONE);
184
185         if(!mesh2) {
186                 fprintf(stderr, "Could not initialize configuration for bar\n");
187                 return 1;
188         }
189
190         meshlink_set_log_cb(mesh2, debug_level, log_cb);
191         meshlink_set_channel_accept_cb(mesh2, accept_cb);
192         meshlink_set_node_status_cb(mesh2, status_cb);
193
194         if(!meshlink_start(mesh2)) {
195                 fprintf(stderr, "Bar could not start\n");
196                 return 1;
197         }
198
199         while(!foo_closed) {
200                 sleep(1);
201         }
202
203         // Clean up.
204
205         meshlink_close(mesh2);
206
207         return 0;
208 }
209
210
211 int main(int argc, char *argv[]) {
212         debug_level = getenv("DEBUG") ? MESHLINK_DEBUG : MESHLINK_ERROR;
213
214         // Initialize and exchange configuration.
215
216         meshlink_handle_t *foo = meshlink_open("echo-fork_conf.1", "foo", "echo-fork", DEV_CLASS_BACKBONE);
217         meshlink_handle_t *bar = meshlink_open("echo-fork_conf.2", "bar", "echo-fork", DEV_CLASS_BACKBONE);
218         meshlink_add_address(foo, "localhost");
219         meshlink_import(bar, meshlink_export(foo));
220         meshlink_import(foo, meshlink_export(bar));
221         meshlink_close(foo);
222         meshlink_close(bar);
223
224         if(fork()) {
225                 return main1();
226         } else {
227                 return main2();
228         }
229 }