]> git.meshlink.io Git - meshlink-tiny/blob - test/blackbox/common/mesh_event_handler.c
Fix a possible crash when opening an ephemeral instance.
[meshlink-tiny] / test / blackbox / common / mesh_event_handler.c
1 /*
2     mesh_event_handler.c -- handling of mesh events API
3     Copyright (C) 2018  Guus Sliepen <guus@meshlink.io>
4
5     This program is free software; you can redistribute it and/or modify
6     it under the terms of the GNU General Public License as published by
7     the Free Software Foundation; either version 2 of the License, or
8     (at your option) any later version.
9
10     This program is distributed in the hope that it will be useful,
11     but WITHOUT ANY WARRANTY; without even the implied warranty of
12     MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
13     GNU General Public License for more details.
14
15     You should have received a copy of the GNU General Public License along
16     with this program; if not, write to the Free Software Foundation, Inc.,
17     51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
18 */
19
20 #ifdef NDEBUG
21 #undef NDEBUG
22 #endif
23
24 #define _POSIX_C_SOURCE 200809L
25
26 #include <stdio.h>
27 #include <stdlib.h>
28 #include <string.h>
29 #include <netinet/in.h>
30 #include <arpa/inet.h>
31 #include <sys/types.h>
32 #include <net/if.h>
33 #include <sys/ioctl.h>
34 #include <sys/socket.h>
35 #include <stdbool.h>
36 #include <errno.h>
37 #include <assert.h>
38 #include <fcntl.h>
39 #include <time.h>
40 #include <pthread.h>
41 #include "../../../src/meshlink_queue.h"
42 #include "../../utils.h"
43 #include "mesh_event_handler.h"
44
45 #define SERVER_LISTEN_PORT "9000" /* Port number that is binded with mesh event server socket */
46 #define UDP_BUFF_MAX 2000
47
48 const char *event_status[] = {
49         [NODE_STARTED]                          = "Node Started",
50         [NODE_JOINED]                           = "Node Joined",
51         [ERR_NETWORK]                           = "Network Error",
52         [CHANNEL_OPENED]                        = "Channel Opened",
53         [CHANNEL_DATA_RECIEVED]                         = "Channel Data Received",
54         [SIG_ABORT]                             = "SIG_ABORT Received",
55         [MESH_EVENT_COMPLETED]                          = "MESH_EVENT_COMPLETED Received"
56 };
57
58 // TODO: Implement mesh event handling with reentrancy .
59 static struct sockaddr_in server_addr;
60 static int client_fd = -1;
61 static int server_fd = -1;
62 static pthread_t event_receive_thread, event_handle_thread;
63 static meshlink_queue_t event_queue;
64 static bool event_receive_thread_running, event_handle_thread_running;
65 static struct cond_flag sync_event = {.mutex  = PTHREAD_MUTEX_INITIALIZER, .cond = PTHREAD_COND_INITIALIZER};
66
67 static void set_cond_flag(struct cond_flag *s, bool flag) {
68         pthread_mutex_lock(&s->mutex);
69         s->flag = flag;
70         pthread_cond_broadcast(&s->cond);
71         pthread_mutex_unlock(&s->mutex);
72 }
73
74 static bool wait_cond_flag(struct cond_flag *s, int seconds) {
75         struct timespec timeout;
76         clock_gettime(CLOCK_REALTIME, &timeout);
77         timeout.tv_sec += seconds;
78
79         pthread_mutex_lock(&s->mutex);
80
81         while(!s->flag)
82                 if(!pthread_cond_timedwait(&s->cond, &s->mutex, &timeout) || errno != EINTR) {
83                         break;
84                 }
85
86         pthread_mutex_unlock(&s->mutex);
87
88         return s->flag;
89 }
90
91 // event_receive_handler running in a separate thread queues all the events received from the UDP port
92 static void *event_receive_handler(void *arg) {
93         (void)arg;
94         size_t recv_ret;
95         char udp_buff[UDP_BUFF_MAX];
96         struct sockaddr client;
97         socklen_t soc_len;
98
99         while(event_receive_thread_running) {
100                 recv_ret = recvfrom(server_fd, udp_buff, sizeof(udp_buff), 0, &client, &soc_len);
101                 assert(recv_ret >= sizeof(mesh_event_payload_t));
102
103                 // Push received mesh event data into the event_queue
104                 mesh_event_payload_t *data = malloc(sizeof(mesh_event_payload_t));
105                 assert(data);
106                 memcpy(data, udp_buff, sizeof(mesh_event_payload_t));
107
108                 // Also receive if there is any payload
109                 if(data->payload_length) {
110                         void *payload_data = malloc(data->payload_length);
111                         assert(payload_data);
112                         memcpy(payload_data, udp_buff + (int)sizeof(mesh_event_payload_t), data->payload_length);
113                         data->payload = payload_data;
114                 } else {
115                         data->payload = NULL;
116                 }
117
118                 // Push the event into the event queue
119                 assert(meshlink_queue_push(&event_queue, data));
120         }
121
122         return NULL;
123 }
124
125 // `event_handler' runs in a separate thread which invokes the event handle callback with
126 // event packet as argument returns from the thread when the callback returns `true' or timeout
127 static void *event_handler(void *argv) {
128         bool callback_return = false;
129         void *data;
130         mesh_event_payload_t mesh_event_rec_packet;
131         mesh_event_callback_t callback = *(mesh_event_callback_t *)argv;
132
133         while(event_handle_thread_running) {
134
135                 // Pops the event if found in the event queue
136                 while((data = meshlink_queue_pop(&event_queue)) != NULL) {
137                         memcpy(&mesh_event_rec_packet, data, sizeof(mesh_event_payload_t));
138                         free(data);
139
140                         // Invokes the callback with the popped event packet
141                         callback_return = callback(mesh_event_rec_packet);
142
143                         if(mesh_event_rec_packet.payload_length) {
144                                 free(mesh_event_rec_packet.payload);
145                         }
146
147                         // Return or Close the event handle thread if callback returns true
148                         if(callback_return) {
149                                 set_cond_flag(&sync_event, true);
150                                 event_handle_thread_running = false;
151                                 break;
152                         }
153                 }
154         }
155
156         return NULL;
157 }
158
159 char *mesh_event_sock_create(const char *if_name) {
160         struct sockaddr_in server = {0};
161         char *ip;
162         struct ifreq req_if = {0};
163         struct sockaddr_in *resp_if_addr;
164
165         assert(if_name);
166         assert(!event_receive_thread_running);
167
168         server_fd = socket(AF_INET, SOCK_DGRAM, 0);
169         assert(server_fd >= 0);
170
171         int reuse = 1;
172         assert(setsockopt(server_fd, SOL_SOCKET, SO_REUSEADDR, &reuse, sizeof(reuse)) != -1);
173
174         req_if.ifr_addr.sa_family = AF_INET;
175         strncpy(req_if.ifr_name, if_name, IFNAMSIZ - 1);
176         assert(ioctl(server_fd, SIOCGIFADDR, &req_if) != -1);
177         resp_if_addr = (struct sockaddr_in *) & (req_if.ifr_addr);
178
179         server.sin_family = AF_INET;
180         server.sin_addr   = resp_if_addr->sin_addr;
181         server.sin_port   = htons(atoi(SERVER_LISTEN_PORT));
182         assert(bind(server_fd, (struct sockaddr *) &server, sizeof(struct sockaddr)) != -1);
183
184         assert((ip = malloc(30)));
185         strncpy(ip, inet_ntoa(resp_if_addr->sin_addr), 20);
186         strcat(ip, ":");
187         strcat(ip, SERVER_LISTEN_PORT);
188
189         meshlink_queue_init(&event_queue);
190         event_receive_thread_running = true;
191         assert(!pthread_create(&event_receive_thread, NULL, event_receive_handler, NULL));
192
193         return ip;
194 }
195
196 void mesh_event_sock_connect(const char *import) {
197         assert(import);
198
199         char *ip = strdup(import);
200         assert(ip);
201         char *port = strchr(ip, ':');
202         assert(port);
203         *port = '\0';
204         port++;
205
206         memset(&server_addr, 0, sizeof(server_addr));
207         server_addr.sin_family      = AF_INET;
208         server_addr.sin_addr.s_addr = inet_addr(ip);
209         server_addr.sin_port        = htons(atoi(port));
210         client_fd = socket(AF_INET, SOCK_DGRAM, 0);
211         free(ip);
212         assert(client_fd >= 0);
213 }
214
215 bool mesh_event_sock_send(int client_id, mesh_event_t event, const void *payload, size_t payload_length) {
216         if(client_fd < 0) {
217                 fprintf(stderr, "mesh_event_sock_send called without calling mesh_event_sock_connect\n");
218                 return false;
219         }
220
221         if(client_id < 0 || event < 0 || event >= MAX_EVENT || (payload == NULL && payload_length)) {
222                 fprintf(stderr, "Invalid parameters\n");
223                 return false;
224         }
225
226         ssize_t send_size = sizeof(mesh_event_payload_t) + payload_length;
227         char *send_packet = malloc(send_size);
228         assert(send_packet);
229         mesh_event_payload_t mesh_event_send_packet;
230
231         mesh_event_send_packet.client_id   = client_id;
232         mesh_event_send_packet.mesh_event  = event;
233         mesh_event_send_packet.payload_length = payload_length;
234         mesh_event_send_packet.payload = NULL;
235         memcpy(send_packet, &mesh_event_send_packet, sizeof(mesh_event_send_packet));
236
237         if(payload_length) {
238                 memcpy(send_packet + sizeof(mesh_event_send_packet), payload, payload_length);
239         }
240
241         ssize_t send_ret = sendto(client_fd, send_packet, send_size, 0, (const struct sockaddr *) &server_addr, sizeof(server_addr));
242         free(send_packet);
243
244         if(send_ret < 0) {
245                 perror("sendto status");
246                 return false;
247         } else {
248                 return true;
249         }
250 }
251
252 bool wait_for_event(mesh_event_callback_t callback, int seconds) {
253         if(callback == NULL || seconds == 0) {
254                 fprintf(stderr, "Invalid parameters\n");
255                 return false;
256         }
257
258         if(event_handle_thread_running) {
259                 fprintf(stderr, "Event handle thread is already running\n");
260                 return false;
261         } else {
262                 event_handle_thread_running = true;
263         }
264
265         set_cond_flag(&sync_event, false);
266         assert(!pthread_create(&event_handle_thread, NULL, event_handler, (void *)&callback));
267         bool wait_ret = wait_cond_flag(&sync_event, seconds);
268         event_handle_thread_running = false;
269         pthread_cancel(event_handle_thread);
270
271         return wait_ret;
272 }
273
274 void mesh_events_flush(void) {
275         mesh_event_payload_t *data;
276
277         while((data = meshlink_queue_pop(&event_queue)) != NULL) {
278                 if(data->payload_length) {
279                         free(data->payload);
280                 }
281
282                 free(data);
283         }
284 }
285
286 void mesh_event_destroy(void) {
287         mesh_events_flush();
288         event_receive_thread_running = false;
289         pthread_cancel(event_receive_thread);
290 }