]> git.meshlink.io Git - meshlink/commitdiff
Handle meshlink_channel_close() being called in callbacks. fix/channel-close-in-callback
authorGuus Sliepen <guus@meshlink.io>
Fri, 8 May 2020 10:48:44 +0000 (12:48 +0200)
committerGuus Sliepen <guus@meshlink.io>
Fri, 8 May 2020 10:48:44 +0000 (12:48 +0200)
When it's called in a callback, we can't free the channel until the
function that called the callback has a chance to safely complete. This
is not a problem for regular receive and poll callbacks, but it is for AIO,
where there can be multiple outstanding AIO buffers that each need their
callback called to signal completion, and each of them could potentially
call meshlink_channel_close().

This also ensures that when the channel is explicitly closed by the
application, it will not receive any further callbacks.

src/meshlink.c
src/meshlink_internal.h

index 80bcc9704480afc84c2bbffe2203db85d2a93ac3..1118b8a5e6b31b5d24cda596b66117099fea661a 100644 (file)
@@ -3455,25 +3455,46 @@ static bool channel_pre_accept(struct utcp *utcp, uint16_t port) {
        return mesh->channel_accept_cb;
 }
 
-static void aio_signal(meshlink_handle_t *mesh, meshlink_channel_t *channel, meshlink_aio_buffer_t *aio) {
-       if(aio->data) {
-               if(aio->cb.buffer) {
-                       aio->cb.buffer(mesh, channel, aio->data, aio->len, aio->priv);
+/* Finish one AIO buffer, return true if the channel is still open. */
+static bool aio_finish_one(meshlink_handle_t *mesh, meshlink_channel_t *channel, meshlink_aio_buffer_t **head) {
+       meshlink_aio_buffer_t *aio = *head;
+       *head = aio->next;
+
+       if(channel->c) {
+               channel->in_callback = true;
+
+               if(aio->data) {
+                       if(aio->cb.buffer) {
+                               aio->cb.buffer(mesh, channel, aio->data, aio->len, aio->priv);
+                       }
+               } else {
+                       if(aio->cb.fd) {
+                               aio->cb.fd(mesh, channel, aio->fd, aio->done, aio->priv);
+                       }
                }
-       } else {
-               if(aio->cb.fd) {
-                       aio->cb.fd(mesh, channel, aio->fd, aio->done, aio->priv);
+
+               channel->in_callback = false;
+
+               if(!channel->c) {
+                       free(aio);
+                       free(channel);
+                       return false;
                }
        }
+
+       free(aio);
+       return true;
 }
 
-static void aio_abort(meshlink_handle_t *mesh, meshlink_channel_t *channel, meshlink_aio_buffer_t **aio) {
-       while(*aio) {
-               meshlink_aio_buffer_t *next = (*aio)->next;
-               aio_signal(mesh, channel, *aio);
-               free(*aio);
-               *aio = next;
+/* Finish all AIO buffers, return true if the channel is still open. */
+static bool aio_abort(meshlink_handle_t *mesh, meshlink_channel_t *channel, meshlink_aio_buffer_t **head) {
+       while(*head) {
+               if(!aio_finish_one(mesh, channel, head)) {
+                       return false;
+               }
        }
+
+       return true;
 }
 
 static ssize_t channel_recv(struct utcp_connection *connection, const void *data, size_t len) {
@@ -3497,7 +3518,10 @@ static ssize_t channel_recv(struct utcp_connection *connection, const void *data
        while(channel->aio_receive) {
                if(!len) {
                        /* This receive callback signalled an error, abort all outstanding AIO buffers. */
-                       aio_abort(mesh, channel, &channel->aio_receive);
+                       if(!aio_abort(mesh, channel, &channel->aio_receive)) {
+                               return len;
+                       }
+
                        break;
                }
 
@@ -3516,9 +3540,11 @@ static ssize_t channel_recv(struct utcp_connection *connection, const void *data
                        if(result <= 0) {
                                /* Writing to fd failed, cancel just this AIO buffer. */
                                logger(mesh, MESHLINK_ERROR, "Writing to AIO fd %d failed: %s", aio->fd, strerror(errno));
-                               channel->aio_receive = aio->next;
-                               aio_signal(mesh, channel, aio);
-                               free(aio);
+
+                               if(!aio_finish_one(mesh, channel, &channel->aio_receive)) {
+                                       return len;
+                               }
+
                                continue;
                        }
 
@@ -3530,9 +3556,9 @@ static ssize_t channel_recv(struct utcp_connection *connection, const void *data
                left -= todo;
 
                if(aio->done == aio->len) {
-                       channel->aio_receive = aio->next;
-                       aio_signal(mesh, channel, aio);
-                       free(aio);
+                       if(!aio_finish_one(mesh, channel, &channel->aio_receive)) {
+                               return len;
+                       }
                }
 
                if(!left) {
@@ -3626,7 +3652,10 @@ static void channel_poll(struct utcp_connection *connection, size_t len) {
        while(channel->aio_send) {
                if(!len) {
                        /* This poll callback signalled an error, abort all outstanding AIO buffers. */
-                       aio_abort(mesh, channel, &channel->aio_send);
+                       if(!aio_abort(mesh, channel, &channel->aio_send)) {
+                               return;
+                       }
+
                        break;
                }
 
@@ -3654,10 +3683,10 @@ static void channel_poll(struct utcp_connection *connection, size_t len) {
                                        logger(mesh, MESHLINK_ERROR, "Reading from AIO fd %d failed: %s", aio->fd, strerror(errno));
                                }
 
-                               channel->aio_send = aio->next;
-                               aio_signal(mesh, channel, aio);
-                               free(aio);
-                               aio = channel->aio_send;
+                               if(!aio_finish_one(mesh, channel, &channel->aio_send)) {
+                                       return;
+                               }
+
                                continue;
                        }
                }
@@ -3667,7 +3696,10 @@ static void channel_poll(struct utcp_connection *connection, size_t len) {
                        assert(sent < 0);
 
                        /* Sending failed, abort all outstanding AIO buffers and send a poll callback. */
-                       aio_abort(mesh, channel, &channel->aio_send);
+                       if(!aio_abort(mesh, channel, &channel->aio_send)) {
+                               return;
+                       }
+
                        len = 0;
                        break;
                }
@@ -3681,9 +3713,9 @@ static void channel_poll(struct utcp_connection *connection, size_t len) {
                }
 
                /* Signal completion of this buffer, and go to the next one. */
-               channel->aio_send = aio->next;
-               aio_signal(mesh, channel, aio);
-               free(aio);
+               if(!aio_finish_one(mesh, channel, &channel->aio_send)) {
+                       return;
+               }
 
                if(!len) {
                        return;
@@ -3834,15 +3866,20 @@ void meshlink_channel_close(meshlink_handle_t *mesh, meshlink_channel_t *channel
 
        pthread_mutex_lock(&mesh->mutex);
 
-       utcp_close(channel->c);
+       if(channel->c) {
+               utcp_close(channel->c);
+               channel->c = NULL;
 
-       /* Clean up any outstanding AIO buffers. */
-       aio_abort(mesh, channel, &channel->aio_send);
-       aio_abort(mesh, channel, &channel->aio_receive);
+               /* Clean up any outstanding AIO buffers. */
+               aio_abort(mesh, channel, &channel->aio_send);
+               aio_abort(mesh, channel, &channel->aio_receive);
+       }
 
-       pthread_mutex_unlock(&mesh->mutex);
+       if(!channel->in_callback) {
+               free(channel);
+       }
 
-       free(channel);
+       pthread_mutex_unlock(&mesh->mutex);
 }
 
 ssize_t meshlink_channel_send(meshlink_handle_t *mesh, meshlink_channel_t *channel, const void *data, size_t len) {
index 8ba4e9801a6a743e47122fe6332e69baca3f691f..f9225ee5cca65f98ac29a71f0c883d2edfc1bcdf 100644 (file)
@@ -223,6 +223,7 @@ typedef struct meshlink_aio_buffer {
 struct meshlink_channel {
        struct node_t *node;
        void *priv;
+       bool in_callback;
 
        struct utcp_connection *c;
        meshlink_aio_buffer_t *aio_send;