X-Git-Url: http://git.meshlink.io/?a=blobdiff_plain;f=test%2Fblackbox%2Fcommon%2Fmesh_event_handler.c;h=1b7ebe92aec61dd15eee952320313591d7b9f33c;hb=a8a14b2b66b9b4784b7744aa712a50b90abe9d59;hp=0f331d67e00690d787c3c2c00121ae843124ecd7;hpb=dc68da94af8fca91748579c84ef5ed798db7efab;p=meshlink diff --git a/test/blackbox/common/mesh_event_handler.c b/test/blackbox/common/mesh_event_handler.c index 0f331d67..1b7ebe92 100644 --- a/test/blackbox/common/mesh_event_handler.c +++ b/test/blackbox/common/mesh_event_handler.c @@ -30,43 +30,134 @@ #include #include #include +#include +#include "../../../src/meshlink_queue.h" +#include "../../utils.h" #include "mesh_event_handler.h" #define SERVER_LISTEN_PORT "9000" /* Port number that is binded with mesh event server socket */ +#define UDP_BUFF_MAX 2000 -// TODO: Implement mesh event handling with reentrant functions(if required). +// TODO: Implement mesh event handling with reentrancy . static struct sockaddr_in server_addr; static int client_fd = -1; static int server_fd = -1; +static pthread_t event_receive_thread, event_handle_thread; +static meshlink_queue_t event_queue; +static bool event_receive_thread_running, event_handle_thread_running; +static struct cond_flag sync_event = {.mutex = PTHREAD_MUTEX_INITIALIZER, .cond = PTHREAD_COND_INITIALIZER}; -char *mesh_event_sock_create(const char *if_name) { - struct sockaddr_in server; - char *ip; - struct ifreq req_if; - struct sockaddr_in *resp_if_addr; +static void set_cond_flag(struct cond_flag *s, bool flag) { + pthread_mutex_lock(&s->mutex); + s->flag = flag; + pthread_cond_broadcast(&s->cond); + pthread_mutex_unlock(&s->mutex); +} + +static bool wait_cond_flag(struct cond_flag *s, int seconds) { + struct timespec timeout; + clock_gettime(CLOCK_REALTIME, &timeout); + timeout.tv_sec += seconds; + + pthread_mutex_lock(&s->mutex); + + while(!s->flag) + if(!pthread_cond_timedwait(&s->cond, &s->mutex, &timeout) || errno != EINTR) { + break; + } + + pthread_mutex_unlock(&s->mutex); + + return s->flag; +} + +// event_receive_handler running in a separate thread queues all the events received from the UDP port +static void *event_receive_handler(void *arg) { + size_t recv_ret; + char udp_buff[UDP_BUFF_MAX]; + struct sockaddr client; + socklen_t soc_len; + + while(event_receive_thread_running) { + recv_ret = recvfrom(server_fd, udp_buff, sizeof(udp_buff), 0, &client, &soc_len); + assert(recv_ret >= sizeof(mesh_event_payload_t)); + + // Push received mesh event data into the event_queue + mesh_event_payload_t *data = malloc(sizeof(mesh_event_payload_t)); + assert(data); + memcpy(data, udp_buff, sizeof(mesh_event_payload_t)); - if(if_name == NULL) { - return NULL; + // Also receive if there is any payload + if(data->payload_length) { + void *payload_data = malloc(data->payload_length); + assert(payload_data); + memcpy(payload_data, udp_buff + (int)sizeof(mesh_event_payload_t), data->payload_length); + data->payload = payload_data; + } else { + data->payload = NULL; + } + + // Push the event into the event queue + assert(meshlink_queue_push(&event_queue, data)); } - server_fd = socket(AF_INET, SOCK_DGRAM, 0); + return NULL; +} + +// `event_handler' runs in a separate thread which invokes the event handle callback with +// event packet as argument returns from the thread when the callback returns `true' or timeout +static void *event_handler(void *argv) { + bool callback_return = false; + void *data; + mesh_event_payload_t mesh_event_rec_packet; + mesh_event_callback_t callback = (mesh_event_callback_t)argv; + + while(event_handle_thread_running) { + + // Pops the event if found in the event queue + while((data = meshlink_queue_pop(&event_queue)) != NULL) { + memcpy(&mesh_event_rec_packet, data, sizeof(mesh_event_payload_t)); + free(data); - if(server_fd < 0) { - perror("socket"); + // Invokes the callback with the popped event packet + callback_return = callback(mesh_event_rec_packet); + + if(mesh_event_rec_packet.payload_length) { + free(mesh_event_rec_packet.payload); + } + + // Return or Close the event handle thread if callback returns true + if(callback_return) { + set_cond_flag(&sync_event, true); + event_handle_thread_running = false; + break; + } + } } + return NULL; +} + +char *mesh_event_sock_create(const char *if_name) { + struct sockaddr_in server = {0}; + char *ip; + struct ifreq req_if = {0}; + struct sockaddr_in *resp_if_addr; + + assert(if_name); + assert(!event_receive_thread_running); + + server_fd = socket(AF_INET, SOCK_DGRAM, 0); assert(server_fd >= 0); int reuse = 1; assert(setsockopt(server_fd, SOL_SOCKET, SO_REUSEADDR, &reuse, sizeof(reuse)) != -1); - memset(&req_if, 0, sizeof(req_if)); req_if.ifr_addr.sa_family = AF_INET; strncpy(req_if.ifr_name, if_name, IFNAMSIZ - 1); assert(ioctl(server_fd, SIOCGIFADDR, &req_if) != -1); resp_if_addr = (struct sockaddr_in *) & (req_if.ifr_addr); - memset(&server, 0, sizeof(server)); server.sin_family = AF_INET; server.sin_addr = resp_if_addr->sin_addr; server.sin_port = htons(atoi(SERVER_LISTEN_PORT)); @@ -77,16 +168,20 @@ char *mesh_event_sock_create(const char *if_name) { strcat(ip, ":"); strcat(ip, SERVER_LISTEN_PORT); + meshlink_queue_init(&event_queue); + event_receive_thread_running = true; + assert(!pthread_create(&event_receive_thread, NULL, event_receive_handler, NULL)); + return ip; } void mesh_event_sock_connect(const char *import) { - char *port = NULL; - assert(import); char *ip = strdup(import); - assert((port = strchr(ip, ':')) != NULL); + assert(ip); + char *port = strchr(ip, ':'); + assert(port); *port = '\0'; port++; @@ -99,25 +194,34 @@ void mesh_event_sock_connect(const char *import) { assert(client_fd >= 0); } -bool mesh_event_sock_send(int client_id, mesh_event_t event, void *payload, size_t payload_length) { +bool mesh_event_sock_send(int client_id, mesh_event_t event, const void *payload, size_t payload_length) { + if(client_fd < 0) { + fprintf(stderr, "mesh_event_sock_send called without calling mesh_event_sock_connect\n"); + return false; + } + + if(client_id < 0 || event < 0 || event >= MAX_EVENT || (payload == NULL && payload_length)) { + fprintf(stderr, "Invalid parameters\n"); + return false; + } + + ssize_t send_size = sizeof(mesh_event_payload_t) + payload_length; + char *send_packet = malloc(send_size); + assert(send_packet); mesh_event_payload_t mesh_event_send_packet; - ssize_t send_ret; - // Packing the mesh event - assert(client_id >= 0); - assert(client_fd >= 0); - assert(event >= 0 && event < MAX_EVENT); mesh_event_send_packet.client_id = client_id; mesh_event_send_packet.mesh_event = event; + mesh_event_send_packet.payload_length = payload_length; + mesh_event_send_packet.payload = NULL; + memcpy(send_packet, &mesh_event_send_packet, sizeof(mesh_event_send_packet)); - if((payload == NULL) || (payload_length == 0)) { - mesh_event_send_packet.payload_length = 0; - } else { - mesh_event_send_packet.payload_length = payload_length; - memmove(mesh_event_send_packet.payload, payload, payload_length); + if(payload_length) { + memcpy(send_packet + sizeof(mesh_event_send_packet), payload, payload_length); } - send_ret = sendto(client_fd, &mesh_event_send_packet, sizeof(mesh_event_send_packet), 0, (const struct sockaddr *) &server_addr, sizeof(server_addr)); + ssize_t send_ret = sendto(client_fd, send_packet, send_size, 0, (const struct sockaddr *) &server_addr, sizeof(server_addr)); + free(send_packet); if(send_ret < 0) { perror("sendto status"); @@ -127,36 +231,43 @@ bool mesh_event_sock_send(int client_id, mesh_event_t event, void *payload, size } } -bool wait_for_event(mesh_event_callback_t callback, int t) { - struct timeval timeout; - struct sockaddr client; - socklen_t soc_len; - fd_set read_fds; - int activity; - mesh_event_payload_t mesh_event_rec_packet; +bool wait_for_event(mesh_event_callback_t callback, int seconds) { + if(callback == NULL || seconds == 0) { + fprintf(stderr, "Invalid parameters\n"); + return false; + } + + if(event_handle_thread_running) { + fprintf(stderr, "Event handle thread is already running\n"); + return false; + } else { + event_handle_thread_running = true; + } - assert(callback); - assert(server_fd >= -1); - assert(t >= 0); - - timeout.tv_sec = t; - timeout.tv_usec = 0; - FD_ZERO(&read_fds); - FD_SET(server_fd, &read_fds); - - while(1) { - activity = select(server_fd + 1, &read_fds, NULL, NULL, &timeout); - assert(activity != -1); - - if(activity == 0) { - // If no activity happened for the timeout given - return false; - } else if(FD_ISSET(server_fd, &read_fds)) { - // Unpacking the mesh event - ssize_t recv_ret = recvfrom(server_fd, &mesh_event_rec_packet, sizeof(mesh_event_rec_packet), 0, &client, &soc_len); - assert(recv_ret == sizeof(mesh_event_rec_packet)); - callback(mesh_event_rec_packet); - return true; + set_cond_flag(&sync_event, false); + assert(!pthread_create(&event_handle_thread, NULL, event_handler, (void *)callback)); + bool wait_ret = wait_cond_flag(&sync_event, seconds); + event_handle_thread_running = false; + pthread_cancel(event_handle_thread); + + return wait_ret; +} + +void mesh_events_flush(void) { + mesh_event_payload_t *data; + + while((data = meshlink_queue_pop(&event_queue)) != NULL) { + if(data->payload_length) { + free(data->payload); } - }// while + + free(data); + } } + +void mesh_event_destroy(void) { + mesh_events_flush(); + event_receive_thread_running = false; + pthread_cancel(event_receive_thread); +} +