From a61bd579001e177c7c855bc5665fc138b48011b1 Mon Sep 17 00:00:00 2001 From: Guus Sliepen Date: Sun, 3 May 2020 22:35:58 +0200 Subject: [PATCH] Handle multiple AIO buffers at once in one callback. This also cancels an AIO buffer when there is an error reading from or writing to a filedescriptor. --- src/meshlink.c | 105 ++++++++++++++++++++++++++++--------------------- 1 file changed, 61 insertions(+), 44 deletions(-) diff --git a/src/meshlink.c b/src/meshlink.c index 5072696b..db958afa 100644 --- a/src/meshlink.c +++ b/src/meshlink.c @@ -3513,12 +3513,21 @@ static ssize_t channel_recv(struct utcp_connection *connection, const void *data } else { ssize_t result = write(aio->fd, p, todo); - if(result > 0) { - todo = result; + if(result <= 0) { + /* Writing to fd failed, cancel just this AIO buffer. */ + logger(mesh, MESHLINK_ERROR, "Writing to AIO fd %d failed: %s", aio->fd, strerror(errno)); + channel->aio_receive = aio->next; + aio_signal(mesh, channel, aio); + free(aio); + continue; } + + todo = result; } aio->done += todo; + p += todo; + left -= todo; if(aio->done == aio->len) { channel->aio_receive = aio->next; @@ -3526,10 +3535,7 @@ static ssize_t channel_recv(struct utcp_connection *connection, const void *data free(aio); } - p += todo; - left -= todo; - - if(!left && len) { + if(!left) { return len; } } @@ -3616,68 +3622,79 @@ static void channel_poll(struct utcp_connection *connection, size_t len) { node_t *n = channel->node; meshlink_handle_t *mesh = n->mesh; - meshlink_aio_buffer_t *aio = channel->aio_send; - if(aio) { - /* We at least one AIO buffer. Send as much as possible form the first buffer. */ - size_t left = aio->len - aio->done; + while(channel->aio_send) { if(!len) { /* This poll callback signalled an error, abort all outstanding AIO buffers. */ aio_abort(mesh, channel, &channel->aio_send); - if(channel->poll_cb) { - channel->poll_cb(mesh, channel, 0); - } else { - utcp_set_poll_cb(connection, NULL); - } - return; + break; } + /* We have at least one AIO buffer. Send as much as possible from the buffers. */ + meshlink_aio_buffer_t *aio = channel->aio_send; + size_t todo = aio->len - aio->done; ssize_t sent; - if(len > left) { - len = left; + if(todo > len) { + todo = len; } if(aio->data) { - sent = utcp_send(connection, (char *)aio->data + aio->done, len); + sent = utcp_send(connection, (char *)aio->data + aio->done, todo); } else { - char buf[65536]; - size_t todo = utcp_get_sndbuf_free(connection); - - if(todo > left) { - todo = left; - } - - if(todo > sizeof(buf)) { - todo = sizeof(buf); - } - + char buf[todo]; ssize_t result = read(aio->fd, buf, todo); if(result > 0) { - sent = utcp_send(connection, buf, result); + todo = result; + sent = utcp_send(connection, buf, todo); } else { - sent = result; + /* Reading from fd failed, cancel just this AIO buffer. */ + if(result != 0) { + logger(mesh, MESHLINK_ERROR, "Reading from AIO fd %d failed: %s", aio->fd, strerror(errno)); + } + + channel->aio_send = aio->next; + aio_signal(mesh, channel, aio); + free(aio); + aio = channel->aio_send; + continue; } } - if(sent >= 0) { - aio->done += sent; + if(sent != (ssize_t)todo) { + /* We should never get a partial send at this point */ + assert(sent < 0); + + /* Sending failed, abort all outstanding AIO buffers and send a poll callback. */ + aio_abort(mesh, channel, &channel->aio_send); + len = 0; + break; } - /* If the buffer is now completely sent, call the callback and dispose of it. */ - if(aio->done >= aio->len) { - channel->aio_send = aio->next; - aio_signal(mesh, channel, aio); - free(aio); + aio->done += sent; + len -= sent; + + /* If we didn't finish this buffer, exit early. */ + if(aio->done < aio->len) { + return; } - } else { - if(channel->poll_cb) { - channel->poll_cb(mesh, channel, len); - } else { - utcp_set_poll_cb(connection, NULL); + + /* Signal completion of this buffer, and go to the next one. */ + channel->aio_send = aio->next; + aio_signal(mesh, channel, aio); + free(aio); + + if(!len) { + return; } } + + if(channel->poll_cb) { + channel->poll_cb(mesh, channel, len); + } else { + utcp_set_poll_cb(connection, NULL); + } } void meshlink_set_channel_poll_cb(meshlink_handle_t *mesh, meshlink_channel_t *channel, meshlink_channel_poll_cb_t cb) { -- 2.39.5