]> git.meshlink.io Git - meshlink/blob - test/echo-fork.c
Updated echo-fork test program.
[meshlink] / test / echo-fork.c
1 #define _GNU_SOURCE
2 #include <stdio.h>
3 #include <unistd.h>
4 #include <stdlib.h>
5 #include <string.h>
6
7 #include "../src/meshlink.h"
8
9 /*
10  * To run this test case, direct a large file to strd
11  */
12
13 volatile bool bar_reachable = false;
14 volatile bool bar_responded = false;
15
16 void log_cb(meshlink_handle_t *mesh, meshlink_log_level_t level, const char *text) {
17         if(mesh)
18                 fprintf(stderr, "(%s) ", mesh->name);
19         fprintf(stderr, "[%d] %s\n", level, text);
20 }
21
22 void status_cb(meshlink_handle_t *mesh, meshlink_node_t *node, bool reachable) {
23         if(!strcmp(node->name, "bar"))
24                 bar_reachable = reachable;
25 }
26
27 void foo_receive_cb(meshlink_handle_t *mesh, meshlink_channel_t *channel, const void *data, size_t len) {
28         //char tmp[len+1];
29         //memset( tmp, 0, sizeof tmp );
30         //snprintf( tmp, len+1, "%s", (char*)data );
31         //fprintf(stderr, "Foo received from Bar:\n%s\n", tmp);
32         //fprintf(stderr, "==============================\n");
33         //fprintf(stdout, "%s", tmp );
34 }
35
36 void bar_receive_cb(meshlink_handle_t *mesh, meshlink_channel_t *channel, const void *data, size_t len) {
37         // Echo the data back.
38         char tmp[len+1];
39         memset( tmp, 0, sizeof tmp );
40         snprintf( tmp, len+1, "%s", (char*)data );
41         //fprintf(stderr, "Bar received:\n%s\n", tmp);
42         //fprintf(stderr, "==============================\n");
43         fprintf(stdout, "%s", tmp );
44         //meshlink_channel_send(mesh, channel, data, len);
45 }
46
47 bool reject_cb(meshlink_handle_t *mesh, meshlink_channel_t *channel, uint16_t port, const void *data, size_t len) {
48         return false;
49 }
50
51 bool accept_cb(meshlink_handle_t *mesh, meshlink_channel_t *channel, uint16_t port, const void *data, size_t len) {
52         if(port != 7)
53                 return false;
54         meshlink_set_channel_receive_cb(mesh, channel, bar_receive_cb);
55         if(data)
56                 bar_receive_cb(mesh, channel, data, len);
57         return true;
58 }
59
60 void poll_cb(meshlink_handle_t *mesh, meshlink_channel_t *channel, size_t len) {
61         meshlink_set_channel_poll_cb(mesh, channel, NULL);
62         bar_responded=true;
63 }
64
65 int main1(int rfd, int wfd) {
66         meshlink_set_log_cb(NULL, MESHLINK_DEBUG, log_cb);
67
68         meshlink_handle_t *mesh1 = meshlink_open("channels_conf.1", "foo", "channels", DEV_CLASS_BACKBONE);
69         if(!mesh1) {
70                 fprintf(stderr, "Could not initialize configuration for foo\n");
71                 return 1;
72         }
73
74         meshlink_add_address(mesh1, "localhost");
75
76         char *data = meshlink_export(mesh1);
77         if(!data) {
78                 fprintf(stderr, "Foo could not export its configuration\n");
79                 return 1;
80         }
81
82         size_t len = strlen(data);
83         write(wfd, &len, sizeof len);
84         write(wfd, data, len);
85         free(data);
86
87         read(rfd, &len, sizeof len);
88         char indata[len + 1];
89         read(rfd, indata, len);
90         indata[len] = 0;
91
92         fprintf(stderr, "Foo exchanged data\n");
93
94         meshlink_import(mesh1, indata);
95
96         meshlink_set_channel_accept_cb(mesh1, reject_cb);
97         meshlink_set_node_status_cb(mesh1, status_cb);
98
99         if(!meshlink_start(mesh1)) {
100                 fprintf(stderr, "Foo could not start\n");
101                 return 1;
102         }
103
104         for(int i = 0; i < 20; i++) {
105                 sleep(1);
106                 if(bar_reachable)
107                         break;
108         }
109
110         if(!bar_reachable) {
111                 fprintf(stderr, "Bar not reachable for foo after 20 seconds\n");
112                 return 1;
113         }
114
115         // Open a channel from foo to bar.
116         
117         meshlink_node_t *bar = meshlink_get_node(mesh1, "bar");
118         if(!bar) {
119                 fprintf(stderr, "Foo could not find bar\n");
120                 return 1;
121         }
122
123         meshlink_channel_t *channel = meshlink_channel_open(mesh1, bar, 7, foo_receive_cb, NULL, 0);
124         meshlink_set_channel_poll_cb(mesh1, channel, poll_cb);
125
126         // read and buffer stdin
127         int BUF_SIZE = 1024*1024;
128         char buffer[BUF_SIZE];
129         size_t contentSize = 1;
130         char *content = malloc( sizeof(char) * BUF_SIZE );
131         if (!content) {
132                 fprintf(stderr, "Could not allocate buffer\n");
133         }
134
135         fprintf(stderr, "Foo reading from stdin...\n");
136         content[0] = '\0';
137         while(fgets(buffer,BUF_SIZE,stdin)) {
138                 char *old = content;
139                 contentSize += strlen(buffer);
140                 content = realloc(content, contentSize);
141                 strcat(content,buffer);
142         }
143
144         for(int i = 0; i < 5; i++) {
145                 sleep(1);
146                 if(bar_responded)
147                         break;
148         }
149
150         if(!bar_responded) {
151                 fprintf(stderr, "Bar did not respond to foo's channel message\n");
152                 return 1;
153         }
154
155         //fprintf(stderr, "Foo sending:\n%s", content);
156         //fprintf(stderr, "==============================\n");
157
158         size_t total = 0;
159         while ( total != contentSize )
160         {
161                 size_t to_send = contentSize - total > 2000 ? 2000 : contentSize - total;
162                 ssize_t tmp = meshlink_channel_send(mesh1, channel, content + total, to_send);
163                 if (tmp >= 0)
164                 {
165                         total += tmp;
166                         if (tmp != to_send)
167                                 sleep(1);
168                 } else {
169                         fprintf(stderr, "Sending message failed\n");
170                         return 1;
171                 }
172         }
173         
174         fprintf(stderr, "Foo finished sending\n");
175
176         sleep(30);
177
178         free(content);
179
180         meshlink_channel_close(mesh1, channel);
181
182         // Clean up.
183
184         meshlink_close(mesh1);
185
186         return 0;
187 }
188
189
190 int main2(int rfd, int wfd) {
191         sleep(1);
192
193         meshlink_set_log_cb(NULL, MESHLINK_DEBUG, log_cb);
194
195         meshlink_handle_t *mesh2 = meshlink_open("channels_conf.2", "bar", "channels", DEV_CLASS_BACKBONE);
196         if(!mesh2) {
197                 fprintf(stderr, "Could not initialize configuration for bar\n");
198                 return 1;
199         }
200
201         char *data = meshlink_export(mesh2);
202         if(!data) {
203                 fprintf(stderr, "Bar could not export its configuration\n");
204                 return 1;
205         }
206
207         size_t len = strlen(data);
208         if(write(wfd, &len, sizeof len) <= 0) abort();
209         if(write(wfd, data, len) <= 0) abort();
210         free(data);
211
212         read(rfd, &len, sizeof len);
213         char indata[len + 1];
214         read(rfd, indata, len);
215         indata[len] = 0;
216
217         fprintf(stderr, "Bar exchanged data\n");
218
219         meshlink_import(mesh2, indata);
220
221         meshlink_set_channel_accept_cb(mesh2, accept_cb);
222
223         if(!meshlink_start(mesh2)) {
224                 fprintf(stderr, "Bar could not start\n");
225                 return 1;
226         }
227
228         sleep(30);
229
230         // Clean up.
231
232         meshlink_close(mesh2);
233
234         return 0;
235 }
236
237
238 int main(int argc, char *argv[]) {
239         int fda[2], fdb[2], result;
240
241         pipe2(fda, 0);
242         pipe2(fdb, 0);
243
244         if(fork())
245                 return main1(fda[0], fdb[1]);
246         else
247                 return main2(fdb[0], fda[1]);
248 }