]> git.meshlink.io Git - meshlink/blobdiff - src/meshlink.c
Wake up the MeshLink thread if framed channel data is pending to be flushed.
[meshlink] / src / meshlink.c
index 5815a7772e9e9e5c738dd3f1e94ca140b2d9bbd5..b309d2190eb9b41e1c21d8976193468291cade50 100644 (file)
@@ -1659,7 +1659,12 @@ bool meshlink_start(meshlink_handle_t *mesh) {
 
        event_loop_start(&mesh->loop);
 
-       if(pthread_create(&mesh->thread, NULL, meshlink_main_loop, mesh) != 0) {
+       // Ensure we have a decent amount of stack space. Musl's default of 80 kB is too small.
+       pthread_attr_t attr;
+       pthread_attr_init(&attr);
+       pthread_attr_setstacksize(&attr, 1024 * 1024);
+
+       if(pthread_create(&mesh->thread, &attr, meshlink_main_loop, mesh) != 0) {
                logger(mesh, MESHLINK_DEBUG, "Could not start thread: %s\n", strerror(errno));
                memset(&mesh->thread, 0, sizeof(mesh)->thread);
                meshlink_errno = MESHLINK_EINTERNAL;
@@ -3675,6 +3680,11 @@ static void channel_poll(struct utcp_connection *connection, size_t len) {
                if(aio->data) {
                        sent = utcp_send(connection, (char *)aio->data + aio->done, todo);
                } else {
+                       /* Limit the amount we read at once to avoid stack overflows */
+                       if(todo > 65536) {
+                               todo = 65536;
+                       }
+
                        char buf[todo];
                        ssize_t result = read(aio->fd, buf, todo);
 
@@ -3701,7 +3711,7 @@ static void channel_poll(struct utcp_connection *connection, size_t len) {
 
                if(sent != (ssize_t)todo) {
                        /* We should never get a partial send at this point */
-                       assert(sent < 0);
+                       assert(sent <= 0);
 
                        /* Sending failed, abort all outstanding AIO buffers and send a poll callback. */
                        if(!aio_abort(mesh, channel, &channel->aio_send)) {
@@ -3896,11 +3906,7 @@ ssize_t meshlink_channel_send(meshlink_handle_t *mesh, meshlink_channel_t *chann
                return -1;
        }
 
-       if(!len) {
-               return 0;
-       }
-
-       if(!data) {
+       if(len && !data) {
                meshlink_errno = MESHLINK_EINVAL;
                return -1;
        }
@@ -3927,6 +3933,10 @@ ssize_t meshlink_channel_send(meshlink_handle_t *mesh, meshlink_channel_t *chann
                meshlink_errno = MESHLINK_ENETWORK;
        }
 
+       if(utcp_get_flush_needed(channel->c)) {
+               signal_trigger(&mesh->loop, &mesh->datafromapp);
+       }
+
        return retval;
 }
 
@@ -4135,6 +4145,27 @@ void meshlink_set_node_channel_timeout(meshlink_handle_t *mesh, meshlink_node_t
        pthread_mutex_unlock(&mesh->mutex);
 }
 
+void meshlink_set_node_flush_timeout(meshlink_handle_t *mesh, meshlink_node_t *node, int timeout) {
+       if(!mesh || !node) {
+               meshlink_errno = MESHLINK_EINVAL;
+               return;
+       }
+
+       node_t *n = (node_t *)node;
+
+       pthread_mutex_lock(&mesh->mutex);
+
+       if(!n->utcp) {
+               n->utcp = utcp_init(channel_accept, channel_pre_accept, channel_send, n);
+               utcp_set_mtu(n->utcp, n->mtu - sizeof(meshlink_packethdr_t));
+               utcp_set_retransmit_cb(n->utcp, channel_retransmit);
+       }
+
+       utcp_set_flush_timeout(n->utcp, timeout);
+
+       pthread_mutex_unlock(&mesh->mutex);
+}
+
 void update_node_status(meshlink_handle_t *mesh, node_t *n) {
        if(n->status.reachable && mesh->channel_accept_cb && !n->utcp) {
                n->utcp = utcp_init(channel_accept, channel_pre_accept, channel_send, n);