]> git.meshlink.io Git - meshlink/commitdiff
Make meshlink_send return immediately. Implemented a queue for communication between...
authorSaverio Proto <zioproto@gmail.com>
Wed, 28 May 2014 19:48:46 +0000 (21:48 +0200)
committerSaverio Proto <zioproto@gmail.com>
Sun, 1 Jun 2014 17:25:23 +0000 (19:25 +0200)
src/event.c
src/event.h
src/meshlink.c
src/meshlink.h
src/meshlink_internal.h
src/net.c

index d23f43bb03c9df683a4ac9a967346f1b7d9aeee5..63657a5fde4d9505079eb34e24c2bf2ee57eb393 100644 (file)
@@ -136,6 +136,14 @@ static void pipe_init(event_loop_t *loop) {
                io_add(loop, &loop->signalio, signalio_handler, NULL, loop->pipefd[0], IO_READ);
 }
 
+void signal_trigger(event_loop_t *loop, signal_t *sig) {
+
+       uint8_t signum = sig->signum;
+       write(loop->pipefd[1], &signum, 1);
+       return;
+
+}
+
 void signal_add(event_loop_t *loop, signal_t *sig, signal_cb_t cb, void *data, uint8_t signum) {
        if(sig->cb)
                return;
index e49732af7062871c4dea91908aad9290e898ed60..c90e50679a171971bde932e51ed7f866d9ceae9e 100644 (file)
@@ -21,6 +21,7 @@
 #define __MESHLINK_EVENT_H__
 
 #include "splay_tree.h"
+#include "system.h"
 
 #define IO_READ 1
 #define IO_WRITE 2
index 0195e04571d4df6e4779a0f8e8f8e60e7ce775e4..72e27b0fe99cf3e956d3d125f093bc020e6c558d 100644 (file)
@@ -734,6 +734,7 @@ meshlink_handle_t *meshlink_open(const char *confbase, const char *name) {
        meshlink_handle_t *mesh = xzalloc(sizeof *mesh);
        mesh->confbase = xstrdup(confbase);
        mesh->name = xstrdup(name);
+       pthread_mutex_init ( &(mesh->outpacketqueue_mutex), NULL);
        event_loop_init(&mesh->loop);
        mesh->loop.data = mesh;
 
@@ -842,25 +843,52 @@ void meshlink_set_log_cb(meshlink_handle_t *mesh, meshlink_log_level_t level, me
 }
 
 bool meshlink_send(meshlink_handle_t *mesh, meshlink_node_t *destination, const void *data, unsigned int len) {
+
+       /* If there is no outgoing list yet, create one. */
+
+       if(!mesh->outpacketqueue)
+               mesh->outpacketqueue = list_alloc(NULL);
+
+       //add packet to the queue
+       outpacketqueue_t *packet_in_queue = xzalloc(sizeof *packet_in_queue);
+       packet_in_queue->destination=destination;
+       packet_in_queue->data=data;
+       packet_in_queue->len=len;
+       pthread_mutex_lock(&(mesh->outpacketqueue_mutex));
+       list_insert_head(mesh->outpacketqueue,packet_in_queue);
+       pthread_mutex_unlock(&(mesh->outpacketqueue_mutex));
+
+       //notify event loop
+       signal_trigger(&(mesh->loop),&(mesh->datafromapp));
+       return true;
+}
+
+void meshlink_send_from_queue(event_loop_t* el,meshlink_handle_t *mesh) {
        vpn_packet_t packet;
        meshlink_packethdr_t *hdr = (meshlink_packethdr_t *)packet.data;
-       if (sizeof(meshlink_packethdr_t) + len > MAXSIZE) {
+
+       outpacketqueue_t* p = list_get_tail(mesh->outpacketqueue);
+       if (p)
+       list_delete_tail(mesh->outpacketqueue);
+       else return ;
+
+       if (sizeof(meshlink_packethdr_t) + p->len > MAXSIZE) {
                //log something
-               return false;
+               return ;
        }
 
        packet.probe = false;
        memset(hdr, 0, sizeof *hdr);
-       memcpy(hdr->destination, destination->name, sizeof hdr->destination);
+       memcpy(hdr->destination, p->destination->name, sizeof hdr->destination);
        memcpy(hdr->source, mesh->self->name, sizeof hdr->source);
 
-       packet.len = sizeof *hdr + len;
-       memcpy(packet.data + sizeof *hdr, data, len);
+       packet.len = sizeof *hdr + p->len;
+       memcpy(packet.data + sizeof *hdr, p->data, p->len);
 
         mesh->self->in_packets++;
         mesh->self->in_bytes += packet.len;
         route(mesh, mesh->self, &packet);
-       return false;
+       return ;
 }
 
 meshlink_node_t *meshlink_get_node(meshlink_handle_t *mesh, const char *name) {
index 36c784c9ed9b7a52c7a7421254694856d888b5a6..f7b3f94836734dd8a17b1f9dbc7025fb4d3dc62f 100644 (file)
@@ -22,6 +22,7 @@
 
 #include <stdbool.h>
 #include <stddef.h>
+#include "event.h"
 
 #ifdef __cplusplus
 extern "C" {
@@ -39,6 +40,11 @@ typedef enum {
        MESHLINK_ENOMEM, // Out of memory
        MESHLINK_ENOENT, // Node is not known
 } meshlink_errno_t;
+typedef struct outpacketqueue {
+       meshlink_node_t *destination;
+       const void *data;
+       unsigned int len;
+} outpacketqueue_t;
 
 #ifndef MESHLINK_INTERNAL_H
 
@@ -191,6 +197,8 @@ extern void meshlink_set_log_cb(meshlink_handle_t *mesh, meshlink_log_level_t le
  */
 extern bool meshlink_send(meshlink_handle_t *mesh, meshlink_node_t *destination, const void *data, unsigned int len);
 
+extern void meshlink_send_from_queue(event_loop_t* el,meshlink_handle_t *mesh);
+
 /// Get a handle for a specific node.
 /** This function returns a handle for the node with the given name.
  *
index ca2e0bc5c4904bf32931375dcd7d8c4356f0e240..c58c477388620198143136e6603e274cbc129a3f 100644 (file)
@@ -60,9 +60,11 @@ struct meshlink_handle {
        meshlink_log_level_t log_level;
 
        pthread_t thread;
+       pthread_mutex_t outpacketqueue_mutex;
        event_loop_t loop;
        listen_socket_t listen_socket[MAXSOCKETS];
        int listen_sockets;
+       signal_t datafromapp;
 
        struct node_t *self;
 
@@ -73,6 +75,8 @@ struct meshlink_handle {
        struct list_t *connections;
        struct list_t *outgoings;
 
+       struct list_t *outpacketqueue;
+
        int contradicting_add_edge;
        int contradicting_del_edge;
        int sleeptime;
index deac6eee42a612a57ef3e621a0eb657f0b924913..5f50531b9ff4fada055e90b255ecf9b9f03ea351 100644 (file)
--- a/src/net.c
+++ b/src/net.c
@@ -269,6 +269,10 @@ int main_loop(meshlink_handle_t *mesh) {
        timeout_add(&mesh->loop, &mesh->pingtimer, timeout_handler, &mesh->pingtimer, &(struct timeval){mesh->pingtimeout, rand() % 100000});
        timeout_add(&mesh->loop, &mesh->periodictimer, periodic_handler, &mesh->periodictimer, &(struct timeval){mesh->pingtimeout, rand() % 100000});
 
+       //Add signal handler
+       mesh->datafromapp.signum = 0;
+       signal_add(&(mesh->loop),&(mesh->datafromapp), (signal_cb_t)meshlink_send_from_queue,mesh, mesh->datafromapp.signum);
+
        if(!event_loop_run(&mesh->loop)) {
                logger(DEBUG_ALWAYS, LOG_ERR, "Error while waiting for input: %s", strerror(errno));
                return 1;