2 mesh_event_handler.c -- handling of mesh events API
3 Copyright (C) 2018 Guus Sliepen <guus@meshlink.io>
5 This program is free software; you can redistribute it and/or modify
6 it under the terms of the GNU General Public License as published by
7 the Free Software Foundation; either version 2 of the License, or
8 (at your option) any later version.
10 This program is distributed in the hope that it will be useful,
11 but WITHOUT ANY WARRANTY; without even the implied warranty of
12 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13 GNU General Public License for more details.
15 You should have received a copy of the GNU General Public License along
16 with this program; if not, write to the Free Software Foundation, Inc.,
17 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
22 #include <netinet/in.h>
23 #include <arpa/inet.h>
24 #include <sys/types.h>
26 #include <sys/ioctl.h>
27 #include <sys/socket.h>
34 #include "../../../src/meshlink_queue.h"
35 #include "../../utils.h"
36 #include "mesh_event_handler.h"
38 #define SERVER_LISTEN_PORT "9000" /* Port number that is binded with mesh event server socket */
39 #define UDP_BUFF_MAX 2000
41 // TODO: Implement mesh event handling with reentrancy .
42 static struct sockaddr_in server_addr;
43 static int client_fd = -1;
44 static int server_fd = -1;
45 static pthread_t event_receive_thread, event_handle_thread;
46 static meshlink_queue_t event_queue;
47 static bool event_receive_thread_running, event_handle_thread_running;
48 static struct cond_flag sync_event = {.mutex = PTHREAD_MUTEX_INITIALIZER, .cond = PTHREAD_COND_INITIALIZER};
50 static void set_cond_flag(struct cond_flag *s, bool flag) {
51 pthread_mutex_lock(&s->mutex);
53 pthread_cond_broadcast(&s->cond);
54 pthread_mutex_unlock(&s->mutex);
57 static bool wait_cond_flag(struct cond_flag *s, int seconds) {
58 struct timespec timeout;
59 clock_gettime(CLOCK_REALTIME, &timeout);
60 timeout.tv_sec += seconds;
62 pthread_mutex_lock(&s->mutex);
65 if(!pthread_cond_timedwait(&s->cond, &s->mutex, &timeout) || errno != EINTR) {
69 pthread_mutex_unlock(&s->mutex);
74 // event_receive_handler running in a separate thread queues all the events received from the UDP port
75 static void *event_receive_handler(void *arg) {
77 char udp_buff[UDP_BUFF_MAX];
78 struct sockaddr client;
81 while(event_receive_thread_running) {
82 recv_ret = recvfrom(server_fd, udp_buff, sizeof(udp_buff), 0, &client, &soc_len);
83 assert(recv_ret >= sizeof(mesh_event_payload_t));
85 // Push received mesh event data into the event_queue
86 mesh_event_payload_t *data = malloc(sizeof(mesh_event_payload_t));
88 memcpy(data, udp_buff, sizeof(mesh_event_payload_t));
90 // Also receive if there is any payload
91 if(data->payload_length) {
92 void *payload_data = malloc(data->payload_length);
94 memcpy(payload_data, udp_buff + (int)sizeof(mesh_event_payload_t), data->payload_length);
95 data->payload = payload_data;
100 // Push the event into the event queue
101 assert(meshlink_queue_push(&event_queue, data));
107 // `event_handler' runs in a separate thread which invokes the event handle callback with
108 // event packet as argument returns from the thread when the callback returns `true' or timeout
109 static void *event_handler(void *argv) {
110 bool callback_return = false;
112 mesh_event_payload_t mesh_event_rec_packet;
113 mesh_event_callback_t callback = (mesh_event_callback_t)argv;
115 while(event_handle_thread_running) {
117 // Pops the event if found in the event queue
118 while((data = meshlink_queue_pop(&event_queue)) != NULL) {
119 memcpy(&mesh_event_rec_packet, data, sizeof(mesh_event_payload_t));
122 // Invokes the callback with the popped event packet
123 callback_return = callback(mesh_event_rec_packet);
125 if(mesh_event_rec_packet.payload_length) {
126 free(mesh_event_rec_packet.payload);
129 // Return or Close the event handle thread if callback returns true
130 if(callback_return) {
131 set_cond_flag(&sync_event, true);
132 event_handle_thread_running = false;
141 char *mesh_event_sock_create(const char *if_name) {
142 struct sockaddr_in server = {0};
144 struct ifreq req_if = {0};
145 struct sockaddr_in *resp_if_addr;
148 assert(!event_receive_thread_running);
150 server_fd = socket(AF_INET, SOCK_DGRAM, 0);
151 assert(server_fd >= 0);
154 assert(setsockopt(server_fd, SOL_SOCKET, SO_REUSEADDR, &reuse, sizeof(reuse)) != -1);
156 req_if.ifr_addr.sa_family = AF_INET;
157 strncpy(req_if.ifr_name, if_name, IFNAMSIZ - 1);
158 assert(ioctl(server_fd, SIOCGIFADDR, &req_if) != -1);
159 resp_if_addr = (struct sockaddr_in *) & (req_if.ifr_addr);
161 server.sin_family = AF_INET;
162 server.sin_addr = resp_if_addr->sin_addr;
163 server.sin_port = htons(atoi(SERVER_LISTEN_PORT));
164 assert(bind(server_fd, (struct sockaddr *) &server, sizeof(struct sockaddr)) != -1);
166 assert(ip = malloc(30));
167 strncpy(ip, inet_ntoa(resp_if_addr->sin_addr), 20);
169 strcat(ip, SERVER_LISTEN_PORT);
171 meshlink_queue_init(&event_queue);
172 event_receive_thread_running = true;
173 assert(!pthread_create(&event_receive_thread, NULL, event_receive_handler, NULL));
178 void mesh_event_sock_connect(const char *import) {
181 char *ip = strdup(import);
183 char *port = strchr(ip, ':');
188 memset(&server_addr, 0, sizeof(server_addr));
189 server_addr.sin_family = AF_INET;
190 server_addr.sin_addr.s_addr = inet_addr(ip);
191 server_addr.sin_port = htons(atoi(port));
192 client_fd = socket(AF_INET, SOCK_DGRAM, 0);
194 assert(client_fd >= 0);
197 bool mesh_event_sock_send(int client_id, mesh_event_t event, const void *payload, size_t payload_length) {
199 fprintf(stderr, "mesh_event_sock_send called without calling mesh_event_sock_connect\n");
203 if(client_id < 0 || event < 0 || event >= MAX_EVENT || (payload == NULL && payload_length)) {
204 fprintf(stderr, "Invalid parameters\n");
208 ssize_t send_size = sizeof(mesh_event_payload_t) + payload_length;
209 char *send_packet = malloc(send_size);
211 mesh_event_payload_t mesh_event_send_packet;
213 mesh_event_send_packet.client_id = client_id;
214 mesh_event_send_packet.mesh_event = event;
215 mesh_event_send_packet.payload_length = payload_length;
216 mesh_event_send_packet.payload = NULL;
217 memcpy(send_packet, &mesh_event_send_packet, sizeof(mesh_event_send_packet));
220 memcpy(send_packet + sizeof(mesh_event_send_packet), payload, payload_length);
223 ssize_t send_ret = sendto(client_fd, send_packet, send_size, 0, (const struct sockaddr *) &server_addr, sizeof(server_addr));
227 perror("sendto status");
234 bool wait_for_event(mesh_event_callback_t callback, int seconds) {
235 if(callback == NULL || seconds == 0) {
236 fprintf(stderr, "Invalid parameters\n");
240 if(event_handle_thread_running) {
241 fprintf(stderr, "Event handle thread is already running\n");
244 event_handle_thread_running = true;
247 set_cond_flag(&sync_event, false);
248 assert(!pthread_create(&event_handle_thread, NULL, event_handler, (void *)callback));
249 bool wait_ret = wait_cond_flag(&sync_event, seconds);
250 event_handle_thread_running = false;
251 pthread_cancel(event_handle_thread);
256 void mesh_events_flush(void) {
257 mesh_event_payload_t *data;
259 while((data = meshlink_queue_pop(&event_queue)) != NULL) {
260 if(data->payload_length) {
268 void mesh_event_destroy(void) {
270 event_receive_thread_running = false;
271 pthread_cancel(event_receive_thread);