#include <assert.h>
#include <fcntl.h>
#include <time.h>
+#include <pthread.h>
+#include "../../../src/meshlink_queue.h"
+#include "../../utils.h"
#include "mesh_event_handler.h"
#define SERVER_LISTEN_PORT "9000" /* Port number that is binded with mesh event server socket */
+#define UDP_BUFF_MAX 2000
-// TODO: Implement mesh event handling with reentrant functions(if required).
+const char *event_status[] = {
+ [NODE_STARTED] = "Node Started",
+ [NODE_JOINED] = "Node Joined",
+ [ERR_NETWORK] = "Network Error",
+ [CHANNEL_OPENED] = "Channel Opened",
+ [CHANNEL_DATA_RECIEVED] = "Channel Data Received",
+ [SIG_ABORT] = "SIG_ABORT Received",
+ [MESH_EVENT_COMPLETED] = "MESH_EVENT_COMPLETED Received"
+};
+
+// TODO: Implement mesh event handling with reentrancy .
static struct sockaddr_in server_addr;
static int client_fd = -1;
static int server_fd = -1;
+static pthread_t event_receive_thread, event_handle_thread;
+static meshlink_queue_t event_queue;
+static bool event_receive_thread_running, event_handle_thread_running;
+static struct cond_flag sync_event = {.mutex = PTHREAD_MUTEX_INITIALIZER, .cond = PTHREAD_COND_INITIALIZER};
-char *mesh_event_sock_create(const char *if_name) {
- struct sockaddr_in server;
- char *ip;
- struct ifreq req_if;
- struct sockaddr_in *resp_if_addr;
+static void set_cond_flag(struct cond_flag *s, bool flag) {
+ pthread_mutex_lock(&s->mutex);
+ s->flag = flag;
+ pthread_cond_broadcast(&s->cond);
+ pthread_mutex_unlock(&s->mutex);
+}
+
+static bool wait_cond_flag(struct cond_flag *s, int seconds) {
+ struct timespec timeout;
+ clock_gettime(CLOCK_REALTIME, &timeout);
+ timeout.tv_sec += seconds;
+
+ pthread_mutex_lock(&s->mutex);
+
+ while(!s->flag)
+ if(!pthread_cond_timedwait(&s->cond, &s->mutex, &timeout) || errno != EINTR) {
+ break;
+ }
+
+ pthread_mutex_unlock(&s->mutex);
+
+ return s->flag;
+}
+
+// event_receive_handler running in a separate thread queues all the events received from the UDP port
+static void *event_receive_handler(void *arg) {
+ (void)arg;
+ size_t recv_ret;
+ char udp_buff[UDP_BUFF_MAX];
+ struct sockaddr client;
+ socklen_t soc_len;
+
+ while(event_receive_thread_running) {
+ recv_ret = recvfrom(server_fd, udp_buff, sizeof(udp_buff), 0, &client, &soc_len);
+ assert(recv_ret >= sizeof(mesh_event_payload_t));
- if(if_name == NULL) {
- return NULL;
+ // Push received mesh event data into the event_queue
+ mesh_event_payload_t *data = malloc(sizeof(mesh_event_payload_t));
+ assert(data);
+ memcpy(data, udp_buff, sizeof(mesh_event_payload_t));
+
+ // Also receive if there is any payload
+ if(data->payload_length) {
+ void *payload_data = malloc(data->payload_length);
+ assert(payload_data);
+ memcpy(payload_data, udp_buff + (int)sizeof(mesh_event_payload_t), data->payload_length);
+ data->payload = payload_data;
+ } else {
+ data->payload = NULL;
+ }
+
+ // Push the event into the event queue
+ assert(meshlink_queue_push(&event_queue, data));
}
- server_fd = socket(AF_INET, SOCK_DGRAM, 0);
+ return NULL;
+}
+
+// `event_handler' runs in a separate thread which invokes the event handle callback with
+// event packet as argument returns from the thread when the callback returns `true' or timeout
+static void *event_handler(void *argv) {
+ bool callback_return = false;
+ void *data;
+ mesh_event_payload_t mesh_event_rec_packet;
+ mesh_event_callback_t callback = *(mesh_event_callback_t *)argv;
+
+ while(event_handle_thread_running) {
- if(server_fd < 0) {
- perror("socket");
+ // Pops the event if found in the event queue
+ while((data = meshlink_queue_pop(&event_queue)) != NULL) {
+ memcpy(&mesh_event_rec_packet, data, sizeof(mesh_event_payload_t));
+ free(data);
+
+ // Invokes the callback with the popped event packet
+ callback_return = callback(mesh_event_rec_packet);
+
+ if(mesh_event_rec_packet.payload_length) {
+ free(mesh_event_rec_packet.payload);
+ }
+
+ // Return or Close the event handle thread if callback returns true
+ if(callback_return) {
+ set_cond_flag(&sync_event, true);
+ event_handle_thread_running = false;
+ break;
+ }
+ }
}
+ return NULL;
+}
+
+char *mesh_event_sock_create(const char *if_name) {
+ struct sockaddr_in server = {0};
+ char *ip;
+ struct ifreq req_if = {0};
+ struct sockaddr_in *resp_if_addr;
+
+ assert(if_name);
+ assert(!event_receive_thread_running);
+
+ server_fd = socket(AF_INET, SOCK_DGRAM, 0);
assert(server_fd >= 0);
int reuse = 1;
assert(setsockopt(server_fd, SOL_SOCKET, SO_REUSEADDR, &reuse, sizeof(reuse)) != -1);
- memset(&req_if, 0, sizeof(req_if));
req_if.ifr_addr.sa_family = AF_INET;
strncpy(req_if.ifr_name, if_name, IFNAMSIZ - 1);
assert(ioctl(server_fd, SIOCGIFADDR, &req_if) != -1);
resp_if_addr = (struct sockaddr_in *) & (req_if.ifr_addr);
- memset(&server, 0, sizeof(server));
server.sin_family = AF_INET;
server.sin_addr = resp_if_addr->sin_addr;
server.sin_port = htons(atoi(SERVER_LISTEN_PORT));
assert(bind(server_fd, (struct sockaddr *) &server, sizeof(struct sockaddr)) != -1);
- assert(ip = malloc(30));
+ assert((ip = malloc(30)));
strncpy(ip, inet_ntoa(resp_if_addr->sin_addr), 20);
strcat(ip, ":");
strcat(ip, SERVER_LISTEN_PORT);
+ meshlink_queue_init(&event_queue);
+ event_receive_thread_running = true;
+ assert(!pthread_create(&event_receive_thread, NULL, event_receive_handler, NULL));
+
return ip;
}
void mesh_event_sock_connect(const char *import) {
- char *port = NULL;
-
assert(import);
char *ip = strdup(import);
- assert((port = strchr(ip, ':')) != NULL);
+ assert(ip);
+ char *port = strchr(ip, ':');
+ assert(port);
*port = '\0';
port++;
assert(client_fd >= 0);
}
-bool mesh_event_sock_send(int client_id, mesh_event_t event, void *payload, size_t payload_length) {
+bool mesh_event_sock_send(int client_id, mesh_event_t event, const void *payload, size_t payload_length) {
+ if(client_fd < 0) {
+ fprintf(stderr, "mesh_event_sock_send called without calling mesh_event_sock_connect\n");
+ return false;
+ }
+
+ if(client_id < 0 || event < 0 || event >= MAX_EVENT || (payload == NULL && payload_length)) {
+ fprintf(stderr, "Invalid parameters\n");
+ return false;
+ }
+
+ ssize_t send_size = sizeof(mesh_event_payload_t) + payload_length;
+ char *send_packet = malloc(send_size);
+ assert(send_packet);
mesh_event_payload_t mesh_event_send_packet;
- ssize_t send_ret;
- // Packing the mesh event
- assert(client_id >= 0);
- assert(client_fd >= 0);
- assert(event >= 0 && event < MAX_EVENT);
mesh_event_send_packet.client_id = client_id;
mesh_event_send_packet.mesh_event = event;
+ mesh_event_send_packet.payload_length = payload_length;
+ mesh_event_send_packet.payload = NULL;
+ memcpy(send_packet, &mesh_event_send_packet, sizeof(mesh_event_send_packet));
- if((payload == NULL) || (payload_length == 0)) {
- mesh_event_send_packet.payload_length = 0;
- } else {
- mesh_event_send_packet.payload_length = payload_length;
- memmove(mesh_event_send_packet.payload, payload, payload_length);
+ if(payload_length) {
+ memcpy(send_packet + sizeof(mesh_event_send_packet), payload, payload_length);
}
- send_ret = sendto(client_fd, &mesh_event_send_packet, sizeof(mesh_event_send_packet), 0, (const struct sockaddr *) &server_addr, sizeof(server_addr));
+ ssize_t send_ret = sendto(client_fd, send_packet, send_size, 0, (const struct sockaddr *) &server_addr, sizeof(server_addr));
+ free(send_packet);
if(send_ret < 0) {
perror("sendto status");
}
}
-bool wait_for_event(mesh_event_callback_t callback, int t) {
- struct timeval timeout;
- struct sockaddr client;
- socklen_t soc_len;
- fd_set read_fds;
- int activity;
- mesh_event_payload_t mesh_event_rec_packet;
+bool wait_for_event(mesh_event_callback_t callback, int seconds) {
+ if(callback == NULL || seconds == 0) {
+ fprintf(stderr, "Invalid parameters\n");
+ return false;
+ }
+
+ if(event_handle_thread_running) {
+ fprintf(stderr, "Event handle thread is already running\n");
+ return false;
+ } else {
+ event_handle_thread_running = true;
+ }
+
+ set_cond_flag(&sync_event, false);
+ assert(!pthread_create(&event_handle_thread, NULL, event_handler, (void *)&callback));
+ bool wait_ret = wait_cond_flag(&sync_event, seconds);
+ event_handle_thread_running = false;
+ pthread_cancel(event_handle_thread);
- assert(callback);
- assert(server_fd >= -1);
- assert(t >= 0);
-
- timeout.tv_sec = t;
- timeout.tv_usec = 0;
- FD_ZERO(&read_fds);
- FD_SET(server_fd, &read_fds);
-
- while(1) {
- activity = select(server_fd + 1, &read_fds, NULL, NULL, &timeout);
- assert(activity != -1);
-
- if(activity == 0) {
- // If no activity happened for the timeout given
- return false;
- } else if(FD_ISSET(server_fd, &read_fds)) {
- // Unpacking the mesh event
- ssize_t recv_ret = recvfrom(server_fd, &mesh_event_rec_packet, sizeof(mesh_event_rec_packet), 0, &client, &soc_len);
- assert(recv_ret == sizeof(mesh_event_rec_packet));
- callback(mesh_event_rec_packet);
- return true;
+ return wait_ret;
+}
+
+void mesh_events_flush(void) {
+ mesh_event_payload_t *data;
+
+ while((data = meshlink_queue_pop(&event_queue)) != NULL) {
+ if(data->payload_length) {
+ free(data->payload);
}
- }// while
+
+ free(data);
+ }
+}
+
+void mesh_event_destroy(void) {
+ mesh_events_flush();
+ event_receive_thread_running = false;
+ pthread_cancel(event_receive_thread);
}