]> git.meshlink.io Git - meshlink/blobdiff - src/meshlink.c
Explicitly set the stack size for the MeshLink thread.
[meshlink] / src / meshlink.c
index 80bcc9704480afc84c2bbffe2203db85d2a93ac3..22f5220d4beebe6c19d6612d45923602e0e78061 100644 (file)
@@ -167,7 +167,7 @@ static int socket_in_netns(int domain, int type, int protocol, int netns) {
 // Find out what local address a socket would use if we connect to the given address.
 // We do this using connect() on a UDP socket, so the kernel has to resolve the address
 // of both endpoints, but this will actually not send any UDP packet.
-static bool getlocaladdr(char *destaddr, sockaddr_t *sa, socklen_t *salen, int netns) {
+static bool getlocaladdr(const char *destaddr, sockaddr_t *sa, socklen_t *salen, int netns) {
        struct addrinfo *rai = NULL;
        const struct addrinfo hint = {
                .ai_family = AF_UNSPEC,
@@ -204,7 +204,7 @@ static bool getlocaladdr(char *destaddr, sockaddr_t *sa, socklen_t *salen, int n
        return true;
 }
 
-static bool getlocaladdrname(char *destaddr, char *host, socklen_t hostlen, int netns) {
+static bool getlocaladdrname(const char *destaddr, char *host, socklen_t hostlen, int netns) {
        sockaddr_t sa;
        socklen_t salen = sizeof(sa);
 
@@ -386,7 +386,7 @@ char *meshlink_get_local_address_for_family(meshlink_handle_t *mesh, int family)
        return xstrdup(localaddr);
 }
 
-void remove_duplicate_hostnames(char *host[], char *port[], int n) {
+static void remove_duplicate_hostnames(char *host[], char *port[], int n) {
        for(int i = 0; i < n; i++) {
                if(!host[i]) {
                        continue;
@@ -710,39 +710,39 @@ static bool finalize_join(join_state_t *state, const void *buf, uint16_t len) {
        // Write host config files
        for(uint32_t i = 0; i < count; i++) {
                const void *data;
-               uint32_t len = packmsg_get_bin_raw(&in, &data);
+               uint32_t data_len = packmsg_get_bin_raw(&in, &data);
 
-               if(!len) {
+               if(!data_len) {
                        logger(mesh, MESHLINK_ERROR, "Incomplete invitation file!\n");
                        return false;
                }
 
-               packmsg_input_t in2 = {data, len};
-               uint32_t version = packmsg_get_uint32(&in2);
-               char *name = packmsg_get_str_dup(&in2);
+               packmsg_input_t in2 = {data, data_len};
+               uint32_t version2 = packmsg_get_uint32(&in2);
+               char *name2 = packmsg_get_str_dup(&in2);
 
-               if(!packmsg_input_ok(&in2) || version != MESHLINK_CONFIG_VERSION || !check_id(name)) {
-                       free(name);
+               if(!packmsg_input_ok(&in2) || version2 != MESHLINK_CONFIG_VERSION || !check_id(name2)) {
+                       free(name2);
                        packmsg_input_invalidate(&in);
                        break;
                }
 
-               if(!check_id(name)) {
-                       free(name);
+               if(!check_id(name2)) {
+                       free(name2);
                        break;
                }
 
-               if(!strcmp(name, mesh->name)) {
+               if(!strcmp(name2, mesh->name)) {
                        logger(mesh, MESHLINK_DEBUG, "Secondary chunk would overwrite our own host config file.\n");
-                       free(name);
+                       free(name2);
                        meshlink_errno = MESHLINK_EPEER;
                        return false;
                }
 
                node_t *n = new_node();
-               n->name = name;
+               n->name = name2;
 
-               config_t config = {data, len};
+               config_t config = {data, data_len};
 
                if(!node_read_from_config(mesh, n, &config)) {
                        free_node(n);
@@ -888,7 +888,7 @@ static bool recvline(join_state_t *state) {
        return true;
 }
 
-static bool sendline(int fd, char *format, ...) {
+static bool sendline(int fd, const char *format, ...) {
        char buffer[4096];
        char *p = buffer;
        int blen = 0;
@@ -1659,7 +1659,12 @@ bool meshlink_start(meshlink_handle_t *mesh) {
 
        event_loop_start(&mesh->loop);
 
-       if(pthread_create(&mesh->thread, NULL, meshlink_main_loop, mesh) != 0) {
+       // Ensure we have a decent amount of stack space. Musl's default of 80 kB is too small.
+       pthread_attr_t attr;
+       pthread_attr_init(&attr);
+       pthread_attr_setstacksize(&attr, 1024 * 1024);
+
+       if(pthread_create(&mesh->thread, &attr, meshlink_main_loop, mesh) != 0) {
                logger(mesh, MESHLINK_DEBUG, "Could not start thread: %s\n", strerror(errno));
                memset(&mesh->thread, 0, sizeof(mesh)->thread);
                meshlink_errno = MESHLINK_EINTERNAL;
@@ -3136,14 +3141,14 @@ bool meshlink_import(meshlink_handle_t *mesh, const char *data) {
        pthread_mutex_lock(&mesh->mutex);
 
        while(count--) {
-               const void *data;
-               uint32_t len = packmsg_get_bin_raw(&in, &data);
+               const void *data2;
+               uint32_t len2 = packmsg_get_bin_raw(&in, &data2);
 
-               if(!len) {
+               if(!len2) {
                        break;
                }
 
-               packmsg_input_t in2 = {data, len};
+               packmsg_input_t in2 = {data2, len2};
                uint32_t version = packmsg_get_uint32(&in2);
                char *name = packmsg_get_str_dup(&in2);
 
@@ -3169,7 +3174,7 @@ bool meshlink_import(meshlink_handle_t *mesh, const char *data) {
                n = new_node();
                n->name = name;
 
-               config_t config = {data, len};
+               config_t config = {data2, len2};
 
                if(!node_read_from_config(mesh, n, &config)) {
                        free_node(n);
@@ -3406,7 +3411,7 @@ bool meshlink_forget_node(meshlink_handle_t *mesh, meshlink_node_t *node) {
        if(mesh->outgoings) {
                for list_each(outgoing_t, outgoing, mesh->outgoings) {
                        if(outgoing->node == n) {
-                               list_delete_node(mesh->outgoings, node);
+                               list_delete_node(mesh->outgoings, list_node);
                        }
                }
        }
@@ -3455,25 +3460,46 @@ static bool channel_pre_accept(struct utcp *utcp, uint16_t port) {
        return mesh->channel_accept_cb;
 }
 
-static void aio_signal(meshlink_handle_t *mesh, meshlink_channel_t *channel, meshlink_aio_buffer_t *aio) {
-       if(aio->data) {
-               if(aio->cb.buffer) {
-                       aio->cb.buffer(mesh, channel, aio->data, aio->len, aio->priv);
+/* Finish one AIO buffer, return true if the channel is still open. */
+static bool aio_finish_one(meshlink_handle_t *mesh, meshlink_channel_t *channel, meshlink_aio_buffer_t **head) {
+       meshlink_aio_buffer_t *aio = *head;
+       *head = aio->next;
+
+       if(channel->c) {
+               channel->in_callback = true;
+
+               if(aio->data) {
+                       if(aio->cb.buffer) {
+                               aio->cb.buffer(mesh, channel, aio->data, aio->done, aio->priv);
+                       }
+               } else {
+                       if(aio->cb.fd) {
+                               aio->cb.fd(mesh, channel, aio->fd, aio->done, aio->priv);
+                       }
                }
-       } else {
-               if(aio->cb.fd) {
-                       aio->cb.fd(mesh, channel, aio->fd, aio->done, aio->priv);
+
+               channel->in_callback = false;
+
+               if(!channel->c) {
+                       free(aio);
+                       free(channel);
+                       return false;
                }
        }
+
+       free(aio);
+       return true;
 }
 
-static void aio_abort(meshlink_handle_t *mesh, meshlink_channel_t *channel, meshlink_aio_buffer_t **aio) {
-       while(*aio) {
-               meshlink_aio_buffer_t *next = (*aio)->next;
-               aio_signal(mesh, channel, *aio);
-               free(*aio);
-               *aio = next;
+/* Finish all AIO buffers, return true if the channel is still open. */
+static bool aio_abort(meshlink_handle_t *mesh, meshlink_channel_t *channel, meshlink_aio_buffer_t **head) {
+       while(*head) {
+               if(!aio_finish_one(mesh, channel, head)) {
+                       return false;
+               }
        }
+
+       return true;
 }
 
 static ssize_t channel_recv(struct utcp_connection *connection, const void *data, size_t len) {
@@ -3497,7 +3523,10 @@ static ssize_t channel_recv(struct utcp_connection *connection, const void *data
        while(channel->aio_receive) {
                if(!len) {
                        /* This receive callback signalled an error, abort all outstanding AIO buffers. */
-                       aio_abort(mesh, channel, &channel->aio_receive);
+                       if(!aio_abort(mesh, channel, &channel->aio_receive)) {
+                               return len;
+                       }
+
                        break;
                }
 
@@ -3514,11 +3543,17 @@ static ssize_t channel_recv(struct utcp_connection *connection, const void *data
                        ssize_t result = write(aio->fd, p, todo);
 
                        if(result <= 0) {
+                               if(result < 0 && errno == EINTR) {
+                                       continue;
+                               }
+
                                /* Writing to fd failed, cancel just this AIO buffer. */
                                logger(mesh, MESHLINK_ERROR, "Writing to AIO fd %d failed: %s", aio->fd, strerror(errno));
-                               channel->aio_receive = aio->next;
-                               aio_signal(mesh, channel, aio);
-                               free(aio);
+
+                               if(!aio_finish_one(mesh, channel, &channel->aio_receive)) {
+                                       return len;
+                               }
+
                                continue;
                        }
 
@@ -3530,9 +3565,9 @@ static ssize_t channel_recv(struct utcp_connection *connection, const void *data
                left -= todo;
 
                if(aio->done == aio->len) {
-                       channel->aio_receive = aio->next;
-                       aio_signal(mesh, channel, aio);
-                       free(aio);
+                       if(!aio_finish_one(mesh, channel, &channel->aio_receive)) {
+                               return len;
+                       }
                }
 
                if(!left) {
@@ -3626,7 +3661,10 @@ static void channel_poll(struct utcp_connection *connection, size_t len) {
        while(channel->aio_send) {
                if(!len) {
                        /* This poll callback signalled an error, abort all outstanding AIO buffers. */
-                       aio_abort(mesh, channel, &channel->aio_send);
+                       if(!aio_abort(mesh, channel, &channel->aio_send)) {
+                               return;
+                       }
+
                        break;
                }
 
@@ -3642,6 +3680,11 @@ static void channel_poll(struct utcp_connection *connection, size_t len) {
                if(aio->data) {
                        sent = utcp_send(connection, (char *)aio->data + aio->done, todo);
                } else {
+                       /* Limit the amount we read at once to avoid stack overflows */
+                       if(todo > 65536) {
+                               todo = 65536;
+                       }
+
                        char buf[todo];
                        ssize_t result = read(aio->fd, buf, todo);
 
@@ -3649,25 +3692,32 @@ static void channel_poll(struct utcp_connection *connection, size_t len) {
                                todo = result;
                                sent = utcp_send(connection, buf, todo);
                        } else {
+                               if(result < 0 && errno == EINTR) {
+                                       continue;
+                               }
+
                                /* Reading from fd failed, cancel just this AIO buffer. */
                                if(result != 0) {
                                        logger(mesh, MESHLINK_ERROR, "Reading from AIO fd %d failed: %s", aio->fd, strerror(errno));
                                }
 
-                               channel->aio_send = aio->next;
-                               aio_signal(mesh, channel, aio);
-                               free(aio);
-                               aio = channel->aio_send;
+                               if(!aio_finish_one(mesh, channel, &channel->aio_send)) {
+                                       return;
+                               }
+
                                continue;
                        }
                }
 
                if(sent != (ssize_t)todo) {
                        /* We should never get a partial send at this point */
-                       assert(sent < 0);
+                       assert(sent <= 0);
 
                        /* Sending failed, abort all outstanding AIO buffers and send a poll callback. */
-                       aio_abort(mesh, channel, &channel->aio_send);
+                       if(!aio_abort(mesh, channel, &channel->aio_send)) {
+                               return;
+                       }
+
                        len = 0;
                        break;
                }
@@ -3681,9 +3731,9 @@ static void channel_poll(struct utcp_connection *connection, size_t len) {
                }
 
                /* Signal completion of this buffer, and go to the next one. */
-               channel->aio_send = aio->next;
-               aio_signal(mesh, channel, aio);
-               free(aio);
+               if(!aio_finish_one(mesh, channel, &channel->aio_send)) {
+                       return;
+               }
 
                if(!len) {
                        return;
@@ -3834,15 +3884,20 @@ void meshlink_channel_close(meshlink_handle_t *mesh, meshlink_channel_t *channel
 
        pthread_mutex_lock(&mesh->mutex);
 
-       utcp_close(channel->c);
+       if(channel->c) {
+               utcp_close(channel->c);
+               channel->c = NULL;
 
-       /* Clean up any outstanding AIO buffers. */
-       aio_abort(mesh, channel, &channel->aio_send);
-       aio_abort(mesh, channel, &channel->aio_receive);
+               /* Clean up any outstanding AIO buffers. */
+               aio_abort(mesh, channel, &channel->aio_send);
+               aio_abort(mesh, channel, &channel->aio_receive);
+       }
 
-       pthread_mutex_unlock(&mesh->mutex);
+       if(!channel->in_callback) {
+               free(channel);
+       }
 
-       free(channel);
+       pthread_mutex_unlock(&mesh->mutex);
 }
 
 ssize_t meshlink_channel_send(meshlink_handle_t *mesh, meshlink_channel_t *channel, const void *data, size_t len) {
@@ -4236,7 +4291,7 @@ void handle_network_change(meshlink_handle_t *mesh, bool online) {
        retry(mesh);
 }
 
-void call_error_cb(meshlink_handle_t *mesh, meshlink_errno_t meshlink_errno) {
+void call_error_cb(meshlink_handle_t *mesh, meshlink_errno_t cb_errno) {
        // We should only call the callback function if we are in the background thread.
        if(!mesh->error_cb) {
                return;
@@ -4247,7 +4302,7 @@ void call_error_cb(meshlink_handle_t *mesh, meshlink_errno_t meshlink_errno) {
        }
 
        if(mesh->thread == pthread_self()) {
-               mesh->error_cb(mesh, meshlink_errno);
+               mesh->error_cb(mesh, cb_errno);
        }
 }