From af8787d1b921fd2e286514ebe83ee1c7b449f298 Mon Sep 17 00:00:00 2001 From: Saverio Proto Date: Wed, 28 May 2014 21:48:46 +0200 Subject: [PATCH] Make meshlink_send return immediately. Implemented a queue for communication between the application and library threads. --- src/event.c | 8 ++++++++ src/event.h | 1 + src/meshlink.c | 40 ++++++++++++++++++++++++++++++++++------ src/meshlink.h | 8 ++++++++ src/meshlink_internal.h | 4 ++++ src/net.c | 4 ++++ 6 files changed, 59 insertions(+), 6 deletions(-) diff --git a/src/event.c b/src/event.c index d23f43bb..63657a5f 100644 --- a/src/event.c +++ b/src/event.c @@ -136,6 +136,14 @@ static void pipe_init(event_loop_t *loop) { io_add(loop, &loop->signalio, signalio_handler, NULL, loop->pipefd[0], IO_READ); } +void signal_trigger(event_loop_t *loop, signal_t *sig) { + + uint8_t signum = sig->signum; + write(loop->pipefd[1], &signum, 1); + return; + +} + void signal_add(event_loop_t *loop, signal_t *sig, signal_cb_t cb, void *data, uint8_t signum) { if(sig->cb) return; diff --git a/src/event.h b/src/event.h index e49732af..c90e5067 100644 --- a/src/event.h +++ b/src/event.h @@ -21,6 +21,7 @@ #define __MESHLINK_EVENT_H__ #include "splay_tree.h" +#include "system.h" #define IO_READ 1 #define IO_WRITE 2 diff --git a/src/meshlink.c b/src/meshlink.c index 0195e045..72e27b0f 100644 --- a/src/meshlink.c +++ b/src/meshlink.c @@ -734,6 +734,7 @@ meshlink_handle_t *meshlink_open(const char *confbase, const char *name) { meshlink_handle_t *mesh = xzalloc(sizeof *mesh); mesh->confbase = xstrdup(confbase); mesh->name = xstrdup(name); + pthread_mutex_init ( &(mesh->outpacketqueue_mutex), NULL); event_loop_init(&mesh->loop); mesh->loop.data = mesh; @@ -842,25 +843,52 @@ void meshlink_set_log_cb(meshlink_handle_t *mesh, meshlink_log_level_t level, me } bool meshlink_send(meshlink_handle_t *mesh, meshlink_node_t *destination, const void *data, unsigned int len) { + + /* If there is no outgoing list yet, create one. */ + + if(!mesh->outpacketqueue) + mesh->outpacketqueue = list_alloc(NULL); + + //add packet to the queue + outpacketqueue_t *packet_in_queue = xzalloc(sizeof *packet_in_queue); + packet_in_queue->destination=destination; + packet_in_queue->data=data; + packet_in_queue->len=len; + pthread_mutex_lock(&(mesh->outpacketqueue_mutex)); + list_insert_head(mesh->outpacketqueue,packet_in_queue); + pthread_mutex_unlock(&(mesh->outpacketqueue_mutex)); + + //notify event loop + signal_trigger(&(mesh->loop),&(mesh->datafromapp)); + return true; +} + +void meshlink_send_from_queue(event_loop_t* el,meshlink_handle_t *mesh) { vpn_packet_t packet; meshlink_packethdr_t *hdr = (meshlink_packethdr_t *)packet.data; - if (sizeof(meshlink_packethdr_t) + len > MAXSIZE) { + + outpacketqueue_t* p = list_get_tail(mesh->outpacketqueue); + if (p) + list_delete_tail(mesh->outpacketqueue); + else return ; + + if (sizeof(meshlink_packethdr_t) + p->len > MAXSIZE) { //log something - return false; + return ; } packet.probe = false; memset(hdr, 0, sizeof *hdr); - memcpy(hdr->destination, destination->name, sizeof hdr->destination); + memcpy(hdr->destination, p->destination->name, sizeof hdr->destination); memcpy(hdr->source, mesh->self->name, sizeof hdr->source); - packet.len = sizeof *hdr + len; - memcpy(packet.data + sizeof *hdr, data, len); + packet.len = sizeof *hdr + p->len; + memcpy(packet.data + sizeof *hdr, p->data, p->len); mesh->self->in_packets++; mesh->self->in_bytes += packet.len; route(mesh, mesh->self, &packet); - return false; + return ; } meshlink_node_t *meshlink_get_node(meshlink_handle_t *mesh, const char *name) { diff --git a/src/meshlink.h b/src/meshlink.h index 36c784c9..f7b3f948 100644 --- a/src/meshlink.h +++ b/src/meshlink.h @@ -22,6 +22,7 @@ #include #include +#include "event.h" #ifdef __cplusplus extern "C" { @@ -39,6 +40,11 @@ typedef enum { MESHLINK_ENOMEM, // Out of memory MESHLINK_ENOENT, // Node is not known } meshlink_errno_t; +typedef struct outpacketqueue { + meshlink_node_t *destination; + const void *data; + unsigned int len; +} outpacketqueue_t; #ifndef MESHLINK_INTERNAL_H @@ -191,6 +197,8 @@ extern void meshlink_set_log_cb(meshlink_handle_t *mesh, meshlink_log_level_t le */ extern bool meshlink_send(meshlink_handle_t *mesh, meshlink_node_t *destination, const void *data, unsigned int len); +extern void meshlink_send_from_queue(event_loop_t* el,meshlink_handle_t *mesh); + /// Get a handle for a specific node. /** This function returns a handle for the node with the given name. * diff --git a/src/meshlink_internal.h b/src/meshlink_internal.h index ca2e0bc5..c58c4773 100644 --- a/src/meshlink_internal.h +++ b/src/meshlink_internal.h @@ -60,9 +60,11 @@ struct meshlink_handle { meshlink_log_level_t log_level; pthread_t thread; + pthread_mutex_t outpacketqueue_mutex; event_loop_t loop; listen_socket_t listen_socket[MAXSOCKETS]; int listen_sockets; + signal_t datafromapp; struct node_t *self; @@ -73,6 +75,8 @@ struct meshlink_handle { struct list_t *connections; struct list_t *outgoings; + struct list_t *outpacketqueue; + int contradicting_add_edge; int contradicting_del_edge; int sleeptime; diff --git a/src/net.c b/src/net.c index deac6eee..5f50531b 100644 --- a/src/net.c +++ b/src/net.c @@ -269,6 +269,10 @@ int main_loop(meshlink_handle_t *mesh) { timeout_add(&mesh->loop, &mesh->pingtimer, timeout_handler, &mesh->pingtimer, &(struct timeval){mesh->pingtimeout, rand() % 100000}); timeout_add(&mesh->loop, &mesh->periodictimer, periodic_handler, &mesh->periodictimer, &(struct timeval){mesh->pingtimeout, rand() % 100000}); + //Add signal handler + mesh->datafromapp.signum = 0; + signal_add(&(mesh->loop),&(mesh->datafromapp), (signal_cb_t)meshlink_send_from_queue,mesh, mesh->datafromapp.signum); + if(!event_loop_run(&mesh->loop)) { logger(DEBUG_ALWAYS, LOG_ERR, "Error while waiting for input: %s", strerror(errno)); return 1; -- 2.39.5