]> git.meshlink.io Git - meshlink/blobdiff - src/meshlink.c
Handle meshlink_channel_close() being called in callbacks.
[meshlink] / src / meshlink.c
index 80bcc9704480afc84c2bbffe2203db85d2a93ac3..1118b8a5e6b31b5d24cda596b66117099fea661a 100644 (file)
@@ -3455,25 +3455,46 @@ static bool channel_pre_accept(struct utcp *utcp, uint16_t port) {
        return mesh->channel_accept_cb;
 }
 
-static void aio_signal(meshlink_handle_t *mesh, meshlink_channel_t *channel, meshlink_aio_buffer_t *aio) {
-       if(aio->data) {
-               if(aio->cb.buffer) {
-                       aio->cb.buffer(mesh, channel, aio->data, aio->len, aio->priv);
+/* Finish one AIO buffer, return true if the channel is still open. */
+static bool aio_finish_one(meshlink_handle_t *mesh, meshlink_channel_t *channel, meshlink_aio_buffer_t **head) {
+       meshlink_aio_buffer_t *aio = *head;
+       *head = aio->next;
+
+       if(channel->c) {
+               channel->in_callback = true;
+
+               if(aio->data) {
+                       if(aio->cb.buffer) {
+                               aio->cb.buffer(mesh, channel, aio->data, aio->len, aio->priv);
+                       }
+               } else {
+                       if(aio->cb.fd) {
+                               aio->cb.fd(mesh, channel, aio->fd, aio->done, aio->priv);
+                       }
                }
-       } else {
-               if(aio->cb.fd) {
-                       aio->cb.fd(mesh, channel, aio->fd, aio->done, aio->priv);
+
+               channel->in_callback = false;
+
+               if(!channel->c) {
+                       free(aio);
+                       free(channel);
+                       return false;
                }
        }
+
+       free(aio);
+       return true;
 }
 
-static void aio_abort(meshlink_handle_t *mesh, meshlink_channel_t *channel, meshlink_aio_buffer_t **aio) {
-       while(*aio) {
-               meshlink_aio_buffer_t *next = (*aio)->next;
-               aio_signal(mesh, channel, *aio);
-               free(*aio);
-               *aio = next;
+/* Finish all AIO buffers, return true if the channel is still open. */
+static bool aio_abort(meshlink_handle_t *mesh, meshlink_channel_t *channel, meshlink_aio_buffer_t **head) {
+       while(*head) {
+               if(!aio_finish_one(mesh, channel, head)) {
+                       return false;
+               }
        }
+
+       return true;
 }
 
 static ssize_t channel_recv(struct utcp_connection *connection, const void *data, size_t len) {
@@ -3497,7 +3518,10 @@ static ssize_t channel_recv(struct utcp_connection *connection, const void *data
        while(channel->aio_receive) {
                if(!len) {
                        /* This receive callback signalled an error, abort all outstanding AIO buffers. */
-                       aio_abort(mesh, channel, &channel->aio_receive);
+                       if(!aio_abort(mesh, channel, &channel->aio_receive)) {
+                               return len;
+                       }
+
                        break;
                }
 
@@ -3516,9 +3540,11 @@ static ssize_t channel_recv(struct utcp_connection *connection, const void *data
                        if(result <= 0) {
                                /* Writing to fd failed, cancel just this AIO buffer. */
                                logger(mesh, MESHLINK_ERROR, "Writing to AIO fd %d failed: %s", aio->fd, strerror(errno));
-                               channel->aio_receive = aio->next;
-                               aio_signal(mesh, channel, aio);
-                               free(aio);
+
+                               if(!aio_finish_one(mesh, channel, &channel->aio_receive)) {
+                                       return len;
+                               }
+
                                continue;
                        }
 
@@ -3530,9 +3556,9 @@ static ssize_t channel_recv(struct utcp_connection *connection, const void *data
                left -= todo;
 
                if(aio->done == aio->len) {
-                       channel->aio_receive = aio->next;
-                       aio_signal(mesh, channel, aio);
-                       free(aio);
+                       if(!aio_finish_one(mesh, channel, &channel->aio_receive)) {
+                               return len;
+                       }
                }
 
                if(!left) {
@@ -3626,7 +3652,10 @@ static void channel_poll(struct utcp_connection *connection, size_t len) {
        while(channel->aio_send) {
                if(!len) {
                        /* This poll callback signalled an error, abort all outstanding AIO buffers. */
-                       aio_abort(mesh, channel, &channel->aio_send);
+                       if(!aio_abort(mesh, channel, &channel->aio_send)) {
+                               return;
+                       }
+
                        break;
                }
 
@@ -3654,10 +3683,10 @@ static void channel_poll(struct utcp_connection *connection, size_t len) {
                                        logger(mesh, MESHLINK_ERROR, "Reading from AIO fd %d failed: %s", aio->fd, strerror(errno));
                                }
 
-                               channel->aio_send = aio->next;
-                               aio_signal(mesh, channel, aio);
-                               free(aio);
-                               aio = channel->aio_send;
+                               if(!aio_finish_one(mesh, channel, &channel->aio_send)) {
+                                       return;
+                               }
+
                                continue;
                        }
                }
@@ -3667,7 +3696,10 @@ static void channel_poll(struct utcp_connection *connection, size_t len) {
                        assert(sent < 0);
 
                        /* Sending failed, abort all outstanding AIO buffers and send a poll callback. */
-                       aio_abort(mesh, channel, &channel->aio_send);
+                       if(!aio_abort(mesh, channel, &channel->aio_send)) {
+                               return;
+                       }
+
                        len = 0;
                        break;
                }
@@ -3681,9 +3713,9 @@ static void channel_poll(struct utcp_connection *connection, size_t len) {
                }
 
                /* Signal completion of this buffer, and go to the next one. */
-               channel->aio_send = aio->next;
-               aio_signal(mesh, channel, aio);
-               free(aio);
+               if(!aio_finish_one(mesh, channel, &channel->aio_send)) {
+                       return;
+               }
 
                if(!len) {
                        return;
@@ -3834,15 +3866,20 @@ void meshlink_channel_close(meshlink_handle_t *mesh, meshlink_channel_t *channel
 
        pthread_mutex_lock(&mesh->mutex);
 
-       utcp_close(channel->c);
+       if(channel->c) {
+               utcp_close(channel->c);
+               channel->c = NULL;
 
-       /* Clean up any outstanding AIO buffers. */
-       aio_abort(mesh, channel, &channel->aio_send);
-       aio_abort(mesh, channel, &channel->aio_receive);
+               /* Clean up any outstanding AIO buffers. */
+               aio_abort(mesh, channel, &channel->aio_send);
+               aio_abort(mesh, channel, &channel->aio_receive);
+       }
 
-       pthread_mutex_unlock(&mesh->mutex);
+       if(!channel->in_callback) {
+               free(channel);
+       }
 
-       free(channel);
+       pthread_mutex_unlock(&mesh->mutex);
 }
 
 ssize_t meshlink_channel_send(meshlink_handle_t *mesh, meshlink_channel_t *channel, const void *data, size_t len) {