From a5d9448ef8ec399c9674caa13339c68cabb8c6f9 Mon Sep 17 00:00:00 2001 From: Guus Sliepen Date: Sun, 3 May 2020 22:33:25 +0200 Subject: [PATCH] Channel errors cancel all outstanding AIO buffers. --- src/meshlink.c | 39 ++++++++++++++++++++++++++++----------- 1 file changed, 28 insertions(+), 11 deletions(-) diff --git a/src/meshlink.c b/src/meshlink.c index 4569c9f5..5072696b 100644 --- a/src/meshlink.c +++ b/src/meshlink.c @@ -3467,6 +3467,15 @@ static void aio_signal(meshlink_handle_t *mesh, meshlink_channel_t *channel, mes } } +static void aio_abort(meshlink_handle_t *mesh, meshlink_channel_t *channel, meshlink_aio_buffer_t **aio) { + while(*aio) { + meshlink_aio_buffer_t *next = (*aio)->next; + aio_signal(mesh, channel, *aio); + free(*aio); + *aio = next; + } +} + static ssize_t channel_recv(struct utcp_connection *connection, const void *data, size_t len) { meshlink_channel_t *channel = connection->priv; @@ -3486,6 +3495,12 @@ static ssize_t channel_recv(struct utcp_connection *connection, const void *data size_t left = len; while(channel->aio_receive) { + if(!len) { + /* This receive callback signalled an error, abort all outstanding AIO buffers. */ + aio_abort(mesh, channel, &channel->aio_receive); + break; + } + meshlink_aio_buffer_t *aio = channel->aio_receive; size_t todo = aio->len - aio->done; @@ -3606,6 +3621,17 @@ static void channel_poll(struct utcp_connection *connection, size_t len) { if(aio) { /* We at least one AIO buffer. Send as much as possible form the first buffer. */ size_t left = aio->len - aio->done; + if(!len) { + /* This poll callback signalled an error, abort all outstanding AIO buffers. */ + aio_abort(mesh, channel, &channel->aio_send); + if(channel->poll_cb) { + channel->poll_cb(mesh, channel, 0); + } else { + utcp_set_poll_cb(connection, NULL); + } + return; + } + ssize_t sent; if(len > left) { @@ -3794,17 +3820,8 @@ void meshlink_channel_close(meshlink_handle_t *mesh, meshlink_channel_t *channel utcp_close(channel->c); /* Clean up any outstanding AIO buffers. */ - for(meshlink_aio_buffer_t *aio = channel->aio_send, *next; aio; aio = next) { - next = aio->next; - aio_signal(mesh, channel, aio); - free(aio); - } - - for(meshlink_aio_buffer_t *aio = channel->aio_receive, *next; aio; aio = next) { - next = aio->next; - aio_signal(mesh, channel, aio); - free(aio); - } + aio_abort(mesh, channel, &channel->aio_send); + aio_abort(mesh, channel, &channel->aio_receive); pthread_mutex_unlock(&mesh->mutex); -- 2.39.5