]> git.meshlink.io Git - meshlink/blobdiff - src/meshlink.c
Add support for AIO using filedescriptors.
[meshlink] / src / meshlink.c
index c42eb7690543b77037284a22ff678237ff3d9631..42573c8ffa6f68837fb1e06f3a567f0baefbdbd9 100644 (file)
@@ -744,8 +744,9 @@ static bool recvline(meshlink_handle_t *mesh, size_t len) {
 
        return true;
 }
+
 static bool sendline(int fd, char *format, ...) {
-       static char buffer[4096];
+       char buffer[4096];
        char *p = buffer;
        int blen = 0;
        va_list ap;
@@ -1381,16 +1382,16 @@ static void *meshlink_main_loop(void *arg) {
 #ifdef HAVE_SETNS
 
                if(setns(mesh->netns, CLONE_NEWNET) != 0) {
+                       pthread_cond_signal(&mesh->cond);
                        return NULL;
                }
 
 #else
+               pthread_cond_signal(&mesh->cond);
                return NULL;
 #endif // HAVE_SETNS
        }
 
-       pthread_mutex_lock(&(mesh->mesh_mutex));
-
 #if HAVE_CATTA
 
        if(mesh->discovery) {
@@ -1399,10 +1400,15 @@ static void *meshlink_main_loop(void *arg) {
 
 #endif
 
+       pthread_mutex_lock(&(mesh->mesh_mutex));
+
        logger(mesh, MESHLINK_DEBUG, "Starting main_loop...\n");
+       pthread_cond_broadcast(&mesh->cond);
        main_loop(mesh);
        logger(mesh, MESHLINK_DEBUG, "main_loop returned.\n");
 
+       pthread_mutex_unlock(&(mesh->mesh_mutex));
+
 #if HAVE_CATTA
 
        // Stop discovery
@@ -1412,7 +1418,6 @@ static void *meshlink_main_loop(void *arg) {
 
 #endif
 
-       pthread_mutex_unlock(&(mesh->mesh_mutex));
        return NULL;
 }
 
@@ -1429,6 +1434,9 @@ bool meshlink_start(meshlink_handle_t *mesh) {
 
        pthread_mutex_lock(&(mesh->mesh_mutex));
 
+       assert(mesh->self->ecdsa);
+       assert(!memcmp((uint8_t *)mesh->self->ecdsa + 64, (uint8_t *)mesh->private_key + 64, 32));
+
        if(mesh->threadstarted) {
                logger(mesh, MESHLINK_DEBUG, "thread was already running\n");
                pthread_mutex_unlock(&(mesh->mesh_mutex));
@@ -1468,12 +1476,9 @@ bool meshlink_start(meshlink_handle_t *mesh) {
                return false;
        }
 
+       pthread_cond_wait(&mesh->cond, &mesh->mesh_mutex);
        mesh->threadstarted = true;
 
-       assert(mesh->self->ecdsa);
-       assert(!memcmp((uint8_t *)mesh->self->ecdsa + 64, (uint8_t *)mesh->private_key + 64, 32));
-
-
        pthread_mutex_unlock(&(mesh->mesh_mutex));
        return true;
 }
@@ -2802,6 +2807,18 @@ static bool channel_pre_accept(struct utcp *utcp, uint16_t port) {
        return mesh->channel_accept_cb;
 }
 
+static void aio_signal(meshlink_handle_t *mesh, meshlink_channel_t *channel, meshlink_aio_buffer_t *aio) {
+       if(aio->data) {
+               if(aio->cb.buffer) {
+                       aio->cb.buffer(mesh, channel, aio->data, aio->len, aio->priv);
+               }
+       } else {
+               if(aio->cb.fd) {
+                       aio->cb.fd(mesh, channel, aio->fd, aio->done, aio->priv);
+               }
+       }
+}
+
 static ssize_t channel_recv(struct utcp_connection *connection, const void *data, size_t len) {
        meshlink_channel_t *channel = connection->priv;
 
@@ -2814,8 +2831,48 @@ static ssize_t channel_recv(struct utcp_connection *connection, const void *data
 
        if(n->status.destroyed) {
                meshlink_channel_close(mesh, channel);
-       } else if(channel->receive_cb) {
-               channel->receive_cb(mesh, channel, data, len);
+               return len;
+       }
+
+       const char *p = data;
+       size_t left = len;
+
+       while(channel->aio_receive) {
+               meshlink_aio_buffer_t *aio = channel->aio_receive;
+               size_t todo = aio->len - aio->done;
+
+               if(todo > left) {
+                       todo = left;
+               }
+
+               if(aio->data) {
+                       memcpy((char *)aio->data + aio->done, p, todo);
+               } else {
+                       ssize_t result = write(aio->fd, p, todo);
+
+                       if(result > 0) {
+                               todo = result;
+                       }
+               }
+
+               aio->done += todo;
+
+               if(aio->done == aio->len) {
+                       channel->aio_receive = aio->next;
+                       aio_signal(mesh, channel, aio);
+                       free(aio);
+               }
+
+               p += todo;
+               left -= todo;
+
+               if(!left && len) {
+                       return len;
+               }
+       }
+
+       if(channel->receive_cb) {
+               channel->receive_cb(mesh, channel, p, left);
        }
 
        return len;
@@ -2885,16 +2942,63 @@ static void channel_poll(struct utcp_connection *connection, size_t len) {
 
        node_t *n = channel->node;
        meshlink_handle_t *mesh = n->mesh;
+       meshlink_aio_buffer_t *aio = channel->aio_send;
+
+       if(aio) {
+               /* We at least one AIO buffer. Send as much as possible form the first buffer. */
+               size_t left = aio->len - aio->done;
+               ssize_t sent;
+
+               if(len > left) {
+                       len = left;
+               }
+
+               if(aio->data) {
+                       sent = utcp_send(connection, (char *)aio->data + aio->done, len);
+               } else {
+                       char buf[65536];
+                       size_t todo = utcp_get_sndbuf_free(connection);
 
-       if(channel->poll_cb) {
-               channel->poll_cb(mesh, channel, len);
+                       if(todo > left) {
+                               todo = left;
+                       }
+
+                       if(todo > sizeof(buf)) {
+                               todo = sizeof(buf);
+                       }
+
+                       ssize_t result = read(aio->fd, buf, todo);
+
+                       if(result > 0) {
+                               sent = utcp_send(connection, buf, result);
+                       } else {
+                               sent = result;
+                       }
+               }
+
+               if(sent >= 0) {
+                       aio->done += sent;
+               }
+
+               /* If the buffer is now completely sent, call the callback and dispose of it. */
+               if(aio->done >= aio->len) {
+                       channel->aio_send = aio->next;
+                       aio_signal(mesh, channel, aio);
+                       free(aio);
+               }
+       } else {
+               if(channel->poll_cb) {
+                       channel->poll_cb(mesh, channel, len);
+               } else {
+                       utcp_set_poll_cb(connection, NULL);
+               }
        }
 }
 
 void meshlink_set_channel_poll_cb(meshlink_handle_t *mesh, meshlink_channel_t *channel, meshlink_channel_poll_cb_t cb) {
        (void)mesh;
        channel->poll_cb = cb;
-       utcp_set_poll_cb(channel->c, cb ? channel_poll : NULL);
+       utcp_set_poll_cb(channel->c, (cb || channel->aio_send) ? channel_poll : NULL);
 }
 
 void meshlink_set_channel_accept_cb(meshlink_handle_t *mesh, meshlink_channel_accept_cb_t cb) {
@@ -2916,6 +3020,32 @@ void meshlink_set_channel_accept_cb(meshlink_handle_t *mesh, meshlink_channel_ac
        pthread_mutex_unlock(&mesh->mesh_mutex);
 }
 
+void meshlink_set_channel_sndbuf(meshlink_handle_t *mesh, meshlink_channel_t *channel, size_t size) {
+       (void)mesh;
+
+       if(!channel) {
+               meshlink_errno = MESHLINK_EINVAL;
+               return;
+       }
+
+       pthread_mutex_lock(&mesh->mesh_mutex);
+       utcp_set_sndbuf(channel->c, size);
+       pthread_mutex_unlock(&mesh->mesh_mutex);
+}
+
+void meshlink_set_channel_rcvbuf(meshlink_handle_t *mesh, meshlink_channel_t *channel, size_t size) {
+       (void)mesh;
+
+       if(!channel) {
+               meshlink_errno = MESHLINK_EINVAL;
+               return;
+       }
+
+       pthread_mutex_lock(&mesh->mesh_mutex);
+       utcp_set_rcvbuf(channel->c, size);
+       pthread_mutex_unlock(&mesh->mesh_mutex);
+}
+
 meshlink_channel_t *meshlink_channel_open_ex(meshlink_handle_t *mesh, meshlink_node_t *node, uint16_t port, meshlink_channel_receive_cb_t cb, const void *data, size_t len, uint32_t flags) {
        if(data || len) {
                abort();        // TODO: handle non-NULL data
@@ -2977,6 +3107,20 @@ void meshlink_channel_close(meshlink_handle_t *mesh, meshlink_channel_t *channel
        }
 
        utcp_close(channel->c);
+
+       /* Clean up any outstanding AIO buffers. */
+       for(meshlink_aio_buffer_t *aio = channel->aio_send, *next; aio; aio = next) {
+               next = aio->next;
+               aio_signal(mesh, channel, aio);
+               free(aio);
+       }
+
+       for(meshlink_aio_buffer_t *aio = channel->aio_receive, *next; aio; aio = next) {
+               next = aio->next;
+               aio_signal(mesh, channel, aio);
+               free(aio);
+       }
+
        free(channel);
 }
 
@@ -3000,8 +3144,17 @@ ssize_t meshlink_channel_send(meshlink_handle_t *mesh, meshlink_channel_t *chann
        // Then, preferably only if there is room in the receiver window,
        // kick the meshlink thread to go send packets.
 
+       ssize_t retval;
+
        pthread_mutex_lock(&mesh->mesh_mutex);
-       ssize_t retval = utcp_send(channel->c, data, len);
+
+       /* Disallow direct calls to utcp_send() while we still have AIO active. */
+       if(channel->aio_send) {
+               retval = 0;
+       } else {
+               retval = utcp_send(channel->c, data, len);
+       }
+
        pthread_mutex_unlock(&mesh->mesh_mutex);
 
        if(retval < 0) {
@@ -3011,6 +3164,146 @@ ssize_t meshlink_channel_send(meshlink_handle_t *mesh, meshlink_channel_t *chann
        return retval;
 }
 
+bool meshlink_channel_aio_send(meshlink_handle_t *mesh, meshlink_channel_t *channel, const void *data, size_t len, meshlink_aio_cb_t cb, void *priv) {
+       if(!mesh || !channel) {
+               meshlink_errno = MESHLINK_EINVAL;
+               return false;
+       }
+
+       if(!len || !data) {
+               meshlink_errno = MESHLINK_EINVAL;
+               return false;
+       }
+
+       meshlink_aio_buffer_t *aio = xzalloc(sizeof(*aio));
+       aio->data = data;
+       aio->len = len;
+       aio->cb.buffer = cb;
+       aio->priv = priv;
+
+       pthread_mutex_lock(&mesh->mesh_mutex);
+
+       /* Append the AIO buffer descriptor to the end of the chain */
+       meshlink_aio_buffer_t **p = &channel->aio_send;
+
+       while(*p) {
+               p = &(*p)->next;
+       }
+
+       *p = aio;
+
+       /* Ensure the poll callback is set, and call it right now to push data if possible */
+       utcp_set_poll_cb(channel->c, channel_poll);
+       channel_poll(channel->c, len);
+
+       pthread_mutex_unlock(&mesh->mesh_mutex);
+
+       return true;
+}
+
+bool meshlink_channel_aio_fd_send(meshlink_handle_t *mesh, meshlink_channel_t *channel, int fd, size_t len, meshlink_aio_fd_cb_t cb, void *priv) {
+       if(!mesh || !channel) {
+               meshlink_errno = MESHLINK_EINVAL;
+               return false;
+       }
+
+       if(!len || fd == -1) {
+               meshlink_errno = MESHLINK_EINVAL;
+               return false;
+       }
+
+       meshlink_aio_buffer_t *aio = xzalloc(sizeof(*aio));
+       aio->fd = fd;
+       aio->len = len;
+       aio->cb.fd = cb;
+       aio->priv = priv;
+
+       pthread_mutex_lock(&mesh->mesh_mutex);
+
+       /* Append the AIO buffer descriptor to the end of the chain */
+       meshlink_aio_buffer_t **p = &channel->aio_send;
+
+       while(*p) {
+               p = &(*p)->next;
+       }
+
+       *p = aio;
+
+       /* Ensure the poll callback is set, and call it right now to push data if possible */
+       utcp_set_poll_cb(channel->c, channel_poll);
+       channel_poll(channel->c, len);
+
+       pthread_mutex_unlock(&mesh->mesh_mutex);
+
+       return true;
+}
+
+bool meshlink_channel_aio_receive(meshlink_handle_t *mesh, meshlink_channel_t *channel, const void *data, size_t len, meshlink_aio_cb_t cb, void *priv) {
+       if(!mesh || !channel) {
+               meshlink_errno = MESHLINK_EINVAL;
+               return false;
+       }
+
+       if(!len || !data) {
+               meshlink_errno = MESHLINK_EINVAL;
+               return false;
+       }
+
+       meshlink_aio_buffer_t *aio = xzalloc(sizeof(*aio));
+       aio->data = data;
+       aio->len = len;
+       aio->cb.buffer = cb;
+       aio->priv = priv;
+
+       pthread_mutex_lock(&mesh->mesh_mutex);
+
+       /* Append the AIO buffer descriptor to the end of the chain */
+       meshlink_aio_buffer_t **p = &channel->aio_receive;
+
+       while(*p) {
+               p = &(*p)->next;
+       }
+
+       *p = aio;
+
+       pthread_mutex_unlock(&mesh->mesh_mutex);
+
+       return true;
+}
+
+bool meshlink_channel_aio_fd_receive(meshlink_handle_t *mesh, meshlink_channel_t *channel, int fd, size_t len, meshlink_aio_fd_cb_t cb, void *priv) {
+       if(!mesh || !channel) {
+               meshlink_errno = MESHLINK_EINVAL;
+               return false;
+       }
+
+       if(!len || fd == -1) {
+               meshlink_errno = MESHLINK_EINVAL;
+               return false;
+       }
+
+       meshlink_aio_buffer_t *aio = xzalloc(sizeof(*aio));
+       aio->fd = fd;
+       aio->len = len;
+       aio->cb.fd = cb;
+       aio->priv = priv;
+
+       pthread_mutex_lock(&mesh->mesh_mutex);
+
+       /* Append the AIO buffer descriptor to the end of the chain */
+       meshlink_aio_buffer_t **p = &channel->aio_receive;
+
+       while(*p) {
+               p = &(*p)->next;
+       }
+
+       *p = aio;
+
+       pthread_mutex_unlock(&mesh->mesh_mutex);
+
+       return true;
+}
+
 uint32_t meshlink_channel_get_flags(meshlink_handle_t *mesh, meshlink_channel_t *channel) {
        if(!mesh || !channel) {
                meshlink_errno = MESHLINK_EINVAL;
@@ -3112,7 +3405,7 @@ static void __attribute__((destructor)) meshlink_exit(void) {
 }
 
 /// Device class traits
-dev_class_traits_t dev_class_traits[_DEV_CLASS_MAX + 1] = {
+const dev_class_traits_t dev_class_traits[_DEV_CLASS_MAX + 1] = {
        { .min_connects = 3, .max_connects = 10000, .edge_weight = 1 }, // DEV_CLASS_BACKBONE
        { .min_connects = 3, .max_connects = 100, .edge_weight = 3 },   // DEV_CLASS_STATIONARY
        { .min_connects = 3, .max_connects = 3, .edge_weight = 6 },             // DEV_CLASS_PORTABLE