From 159caac8481fe2cfbf480c1adb668d0ef66c6413 Mon Sep 17 00:00:00 2001 From: Guus Sliepen Date: Thu, 7 Aug 2014 15:41:16 +0200 Subject: [PATCH] Add a public API for the thread-safe message queue. --- src/meshlink.c | 20 ++++-------- src/meshlink_internal.h | 3 +- src/meshlink_queue.h | 71 +++++++++++++++++++++++++++++++++++++++++ 3 files changed, 80 insertions(+), 14 deletions(-) create mode 100644 src/meshlink_queue.h diff --git a/src/meshlink.c b/src/meshlink.c index 3def13fd..08aee7a9 100644 --- a/src/meshlink.c +++ b/src/meshlink.c @@ -772,7 +772,6 @@ meshlink_handle_t *meshlink_open_with_size(const char *confbase, const char *nam meshlink_handle_t *mesh = xzalloc(size); mesh->confbase = xstrdup(confbase); if (usingname) mesh->name = xstrdup(name); - pthread_mutex_init ( &(mesh->outpacketqueue_mutex), NULL); pthread_mutex_init ( &(mesh->nodes_mutex), NULL); mesh->threadstarted = false; event_loop_init(&mesh->loop); @@ -957,19 +956,15 @@ bool meshlink_send(meshlink_handle_t *mesh, meshlink_node_t *destination, const return false; } - /* If there is no outgoing list yet, create one. */ - - if(!mesh->outpacketqueue) - mesh->outpacketqueue = list_alloc(NULL); - //add packet to the queue outpacketqueue_t *packet_in_queue = xzalloc(sizeof *packet_in_queue); packet_in_queue->destination=destination; packet_in_queue->data=data; packet_in_queue->len=len; - pthread_mutex_lock(&(mesh->outpacketqueue_mutex)); - list_insert_head(mesh->outpacketqueue,packet_in_queue); - pthread_mutex_unlock(&(mesh->outpacketqueue_mutex)); + if(!meshlink_queue_push(&mesh->outpacketqueue, packet_in_queue)) { + free(packet_in_queue); + return false; + } //notify event loop signal_trigger(&(mesh->loop),&(mesh->datafromapp)); @@ -980,10 +975,9 @@ void meshlink_send_from_queue(event_loop_t* el,meshlink_handle_t *mesh) { vpn_packet_t packet; meshlink_packethdr_t *hdr = (meshlink_packethdr_t *)packet.data; - outpacketqueue_t* p = list_get_tail(mesh->outpacketqueue); - if (p) - list_delete_tail(mesh->outpacketqueue); - else return ; + outpacketqueue_t* p = meshlink_queue_pop(&mesh->outpacketqueue); + if(!p) + return; if (sizeof(meshlink_packethdr_t) + p->len > MAXSIZE) { //log something diff --git a/src/meshlink_internal.h b/src/meshlink_internal.h index 1bf3085c..c9c1eeaa 100644 --- a/src/meshlink_internal.h +++ b/src/meshlink_internal.h @@ -26,6 +26,7 @@ #include "hash.h" #include "logger.h" #include "meshlink.h" +#include "meshlink_queue.h" #include "sockaddr.h" #include "sptps.h" @@ -85,7 +86,7 @@ struct meshlink_handle { struct list_t *connections; struct list_t *outgoings; - struct list_t *outpacketqueue; + meshlink_queue_t outpacketqueue; struct splay_tree_t *past_request_tree; timeout_t past_request_timeout; diff --git a/src/meshlink_queue.h b/src/meshlink_queue.h new file mode 100644 index 00000000..a1f2524e --- /dev/null +++ b/src/meshlink_queue.h @@ -0,0 +1,71 @@ +/* + queue.h -- Thread-safe queue + Copyright (C) 2014 Guus Sliepen + + This program is free software; you can redistribute it and/or modify + it under the terms of the GNU General Public License as published by + the Free Software Foundation; either version 2 of the License, or + (at your option) any later version. + + This program is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU General Public License for more details. + + You should have received a copy of the GNU General Public License along + with this program; if not, write to the Free Software Foundation, Inc., + 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA. +*/ + +#ifndef MESHLINK_QUEUE_H +#define MESHLINK_QUEUE_H + +#include +#include +#include +#include + +typedef struct meshlink_queue { + struct meshlink_queue_item *head; + struct meshlink_queue_item *tail; + pthread_mutex_t mutex; +} meshlink_queue_t; + +typedef struct meshlink_queue_item { + void *data; + struct meshlink_queue_item *next; +} meshlink_queue_item_t; + +static inline bool meshlink_queue_push(meshlink_queue_t *queue, void *data) { + meshlink_queue_item_t *item = malloc(sizeof *item); + fprintf(stderr, "Pushing %p %p %p\n", queue, item, data); + if(!item) + return false; + item->data = data; + item->next = NULL; + pthread_mutex_lock(&queue->mutex); + if(!queue->tail) + queue->head = queue->tail = item; + else + queue->tail = queue->tail->next = item; + pthread_mutex_unlock(&queue->mutex); + return true; +} + +static inline void *meshlink_queue_pop(meshlink_queue_t *queue) { + meshlink_queue_item_t *item; + void *data; + pthread_mutex_lock(&queue->mutex); + if((item = queue->head)) { + queue->head = item->next; + if(!queue->head) + queue->tail = NULL; + } + pthread_mutex_unlock(&queue->mutex); + data = item ? item->data : NULL; + fprintf(stderr, "Popping %p %p %p\n", queue, item, data); + free(item); + return data; +} + +#endif -- 2.39.5