From 26fdd4fc9d2a2cc12b0118c3061a65ab3f3ee6c4 Mon Sep 17 00:00:00 2001 From: Guus Sliepen Date: Fri, 8 May 2020 12:48:44 +0200 Subject: [PATCH] Handle meshlink_channel_close() being called in callbacks. When it's called in a callback, we can't free the channel until the function that called the callback has a chance to safely complete. This is not a problem for regular receive and poll callbacks, but it is for AIO, where there can be multiple outstanding AIO buffers that each need their callback called to signal completion, and each of them could potentially call meshlink_channel_close(). This also ensures that when the channel is explicitly closed by the application, it will not receive any further callbacks. --- src/meshlink.c | 107 +++++++++++++++++++++++++++------------- src/meshlink_internal.h | 1 + 2 files changed, 73 insertions(+), 35 deletions(-) diff --git a/src/meshlink.c b/src/meshlink.c index 80bcc970..1118b8a5 100644 --- a/src/meshlink.c +++ b/src/meshlink.c @@ -3455,25 +3455,46 @@ static bool channel_pre_accept(struct utcp *utcp, uint16_t port) { return mesh->channel_accept_cb; } -static void aio_signal(meshlink_handle_t *mesh, meshlink_channel_t *channel, meshlink_aio_buffer_t *aio) { - if(aio->data) { - if(aio->cb.buffer) { - aio->cb.buffer(mesh, channel, aio->data, aio->len, aio->priv); +/* Finish one AIO buffer, return true if the channel is still open. */ +static bool aio_finish_one(meshlink_handle_t *mesh, meshlink_channel_t *channel, meshlink_aio_buffer_t **head) { + meshlink_aio_buffer_t *aio = *head; + *head = aio->next; + + if(channel->c) { + channel->in_callback = true; + + if(aio->data) { + if(aio->cb.buffer) { + aio->cb.buffer(mesh, channel, aio->data, aio->len, aio->priv); + } + } else { + if(aio->cb.fd) { + aio->cb.fd(mesh, channel, aio->fd, aio->done, aio->priv); + } } - } else { - if(aio->cb.fd) { - aio->cb.fd(mesh, channel, aio->fd, aio->done, aio->priv); + + channel->in_callback = false; + + if(!channel->c) { + free(aio); + free(channel); + return false; } } + + free(aio); + return true; } -static void aio_abort(meshlink_handle_t *mesh, meshlink_channel_t *channel, meshlink_aio_buffer_t **aio) { - while(*aio) { - meshlink_aio_buffer_t *next = (*aio)->next; - aio_signal(mesh, channel, *aio); - free(*aio); - *aio = next; +/* Finish all AIO buffers, return true if the channel is still open. */ +static bool aio_abort(meshlink_handle_t *mesh, meshlink_channel_t *channel, meshlink_aio_buffer_t **head) { + while(*head) { + if(!aio_finish_one(mesh, channel, head)) { + return false; + } } + + return true; } static ssize_t channel_recv(struct utcp_connection *connection, const void *data, size_t len) { @@ -3497,7 +3518,10 @@ static ssize_t channel_recv(struct utcp_connection *connection, const void *data while(channel->aio_receive) { if(!len) { /* This receive callback signalled an error, abort all outstanding AIO buffers. */ - aio_abort(mesh, channel, &channel->aio_receive); + if(!aio_abort(mesh, channel, &channel->aio_receive)) { + return len; + } + break; } @@ -3516,9 +3540,11 @@ static ssize_t channel_recv(struct utcp_connection *connection, const void *data if(result <= 0) { /* Writing to fd failed, cancel just this AIO buffer. */ logger(mesh, MESHLINK_ERROR, "Writing to AIO fd %d failed: %s", aio->fd, strerror(errno)); - channel->aio_receive = aio->next; - aio_signal(mesh, channel, aio); - free(aio); + + if(!aio_finish_one(mesh, channel, &channel->aio_receive)) { + return len; + } + continue; } @@ -3530,9 +3556,9 @@ static ssize_t channel_recv(struct utcp_connection *connection, const void *data left -= todo; if(aio->done == aio->len) { - channel->aio_receive = aio->next; - aio_signal(mesh, channel, aio); - free(aio); + if(!aio_finish_one(mesh, channel, &channel->aio_receive)) { + return len; + } } if(!left) { @@ -3626,7 +3652,10 @@ static void channel_poll(struct utcp_connection *connection, size_t len) { while(channel->aio_send) { if(!len) { /* This poll callback signalled an error, abort all outstanding AIO buffers. */ - aio_abort(mesh, channel, &channel->aio_send); + if(!aio_abort(mesh, channel, &channel->aio_send)) { + return; + } + break; } @@ -3654,10 +3683,10 @@ static void channel_poll(struct utcp_connection *connection, size_t len) { logger(mesh, MESHLINK_ERROR, "Reading from AIO fd %d failed: %s", aio->fd, strerror(errno)); } - channel->aio_send = aio->next; - aio_signal(mesh, channel, aio); - free(aio); - aio = channel->aio_send; + if(!aio_finish_one(mesh, channel, &channel->aio_send)) { + return; + } + continue; } } @@ -3667,7 +3696,10 @@ static void channel_poll(struct utcp_connection *connection, size_t len) { assert(sent < 0); /* Sending failed, abort all outstanding AIO buffers and send a poll callback. */ - aio_abort(mesh, channel, &channel->aio_send); + if(!aio_abort(mesh, channel, &channel->aio_send)) { + return; + } + len = 0; break; } @@ -3681,9 +3713,9 @@ static void channel_poll(struct utcp_connection *connection, size_t len) { } /* Signal completion of this buffer, and go to the next one. */ - channel->aio_send = aio->next; - aio_signal(mesh, channel, aio); - free(aio); + if(!aio_finish_one(mesh, channel, &channel->aio_send)) { + return; + } if(!len) { return; @@ -3834,15 +3866,20 @@ void meshlink_channel_close(meshlink_handle_t *mesh, meshlink_channel_t *channel pthread_mutex_lock(&mesh->mutex); - utcp_close(channel->c); + if(channel->c) { + utcp_close(channel->c); + channel->c = NULL; - /* Clean up any outstanding AIO buffers. */ - aio_abort(mesh, channel, &channel->aio_send); - aio_abort(mesh, channel, &channel->aio_receive); + /* Clean up any outstanding AIO buffers. */ + aio_abort(mesh, channel, &channel->aio_send); + aio_abort(mesh, channel, &channel->aio_receive); + } - pthread_mutex_unlock(&mesh->mutex); + if(!channel->in_callback) { + free(channel); + } - free(channel); + pthread_mutex_unlock(&mesh->mutex); } ssize_t meshlink_channel_send(meshlink_handle_t *mesh, meshlink_channel_t *channel, const void *data, size_t len) { diff --git a/src/meshlink_internal.h b/src/meshlink_internal.h index 8ba4e980..f9225ee5 100644 --- a/src/meshlink_internal.h +++ b/src/meshlink_internal.h @@ -223,6 +223,7 @@ typedef struct meshlink_aio_buffer { struct meshlink_channel { struct node_t *node; void *priv; + bool in_callback; struct utcp_connection *c; meshlink_aio_buffer_t *aio_send; -- 2.39.2