]> git.meshlink.io Git - meshlink/commitdiff
Add a public API for the thread-safe message queue.
authorGuus Sliepen <guus@meshlink.io>
Thu, 7 Aug 2014 13:41:16 +0000 (15:41 +0200)
committerGuus Sliepen <guus@meshlink.io>
Thu, 7 Aug 2014 13:41:16 +0000 (15:41 +0200)
src/meshlink.c
src/meshlink_internal.h
src/meshlink_queue.h [new file with mode: 0644]

index 3def13fdc3d89fab2917736523d5ec9268e43ca4..08aee7a9df3cdc8bdfa75840a5e179720c53c782 100644 (file)
@@ -772,7 +772,6 @@ meshlink_handle_t *meshlink_open_with_size(const char *confbase, const char *nam
        meshlink_handle_t *mesh = xzalloc(size);
        mesh->confbase = xstrdup(confbase);
        if (usingname) mesh->name = xstrdup(name);
-       pthread_mutex_init ( &(mesh->outpacketqueue_mutex), NULL);
        pthread_mutex_init ( &(mesh->nodes_mutex), NULL);
        mesh->threadstarted = false;
        event_loop_init(&mesh->loop);
@@ -957,19 +956,15 @@ bool meshlink_send(meshlink_handle_t *mesh, meshlink_node_t *destination, const
                return false;
        }
 
-       /* If there is no outgoing list yet, create one. */
-
-       if(!mesh->outpacketqueue)
-               mesh->outpacketqueue = list_alloc(NULL);
-
        //add packet to the queue
        outpacketqueue_t *packet_in_queue = xzalloc(sizeof *packet_in_queue);
        packet_in_queue->destination=destination;
        packet_in_queue->data=data;
        packet_in_queue->len=len;
-       pthread_mutex_lock(&(mesh->outpacketqueue_mutex));
-       list_insert_head(mesh->outpacketqueue,packet_in_queue);
-       pthread_mutex_unlock(&(mesh->outpacketqueue_mutex));
+       if(!meshlink_queue_push(&mesh->outpacketqueue, packet_in_queue)) {
+               free(packet_in_queue);
+               return false;
+       }
 
        //notify event loop
        signal_trigger(&(mesh->loop),&(mesh->datafromapp));
@@ -980,10 +975,9 @@ void meshlink_send_from_queue(event_loop_t* el,meshlink_handle_t *mesh) {
        vpn_packet_t packet;
        meshlink_packethdr_t *hdr = (meshlink_packethdr_t *)packet.data;
 
-       outpacketqueue_t* p = list_get_tail(mesh->outpacketqueue);
-       if (p)
-       list_delete_tail(mesh->outpacketqueue);
-       else return ;
+       outpacketqueue_t* p = meshlink_queue_pop(&mesh->outpacketqueue);
+       if(!p)
+               return;
 
        if (sizeof(meshlink_packethdr_t) + p->len > MAXSIZE) {
                //log something
index 1bf3085c92884975e5abe55c29a193f820d707e9..c9c1eeaa38ffa9bd0f5f9bb785afaa9bafeb66c3 100644 (file)
@@ -26,6 +26,7 @@
 #include "hash.h"
 #include "logger.h"
 #include "meshlink.h"
+#include "meshlink_queue.h"
 #include "sockaddr.h"
 #include "sptps.h"
 
@@ -85,7 +86,7 @@ struct meshlink_handle {
        struct list_t *connections;
        struct list_t *outgoings;
 
-       struct list_t *outpacketqueue;
+       meshlink_queue_t outpacketqueue;
 
        struct splay_tree_t *past_request_tree;
        timeout_t past_request_timeout;
diff --git a/src/meshlink_queue.h b/src/meshlink_queue.h
new file mode 100644 (file)
index 0000000..a1f2524
--- /dev/null
@@ -0,0 +1,71 @@
+/*
+    queue.h -- Thread-safe queue
+    Copyright (C) 2014 Guus Sliepen <guus@meshlink.io>
+
+    This program is free software; you can redistribute it and/or modify
+    it under the terms of the GNU General Public License as published by
+    the Free Software Foundation; either version 2 of the License, or
+    (at your option) any later version.
+
+    This program is distributed in the hope that it will be useful,
+    but WITHOUT ANY WARRANTY; without even the implied warranty of
+    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+    GNU General Public License for more details.
+
+    You should have received a copy of the GNU General Public License along
+    with this program; if not, write to the Free Software Foundation, Inc.,
+    51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
+*/
+
+#ifndef MESHLINK_QUEUE_H
+#define MESHLINK_QUEUE_H
+
+#include <pthread.h>
+#include <stdbool.h>
+#include <stddef.h>
+#include <unistd.h>
+
+typedef struct meshlink_queue {
+       struct meshlink_queue_item *head;       
+       struct meshlink_queue_item *tail;       
+       pthread_mutex_t mutex;
+} meshlink_queue_t;
+
+typedef struct meshlink_queue_item {
+       void *data;
+       struct meshlink_queue_item *next;
+} meshlink_queue_item_t;
+
+static inline bool meshlink_queue_push(meshlink_queue_t *queue, void *data) {
+       meshlink_queue_item_t *item = malloc(sizeof *item);
+       fprintf(stderr, "Pushing %p %p %p\n", queue, item, data);
+       if(!item)
+               return false;
+       item->data = data;
+       item->next = NULL;
+       pthread_mutex_lock(&queue->mutex);
+       if(!queue->tail)
+               queue->head = queue->tail = item;
+       else
+               queue->tail = queue->tail->next = item;
+       pthread_mutex_unlock(&queue->mutex);
+       return true;
+}
+
+static inline void *meshlink_queue_pop(meshlink_queue_t *queue) {
+       meshlink_queue_item_t *item;
+       void *data;
+       pthread_mutex_lock(&queue->mutex);
+       if((item = queue->head)) {
+               queue->head = item->next;
+               if(!queue->head)
+                       queue->tail = NULL;
+       }
+       pthread_mutex_unlock(&queue->mutex);
+       data = item ? item->data : NULL;
+       fprintf(stderr, "Popping %p %p %p\n", queue, item, data);
+       free(item);
+       return data;
+}
+
+#endif