From: Guus Sliepen Date: Tue, 28 Apr 2020 20:21:34 +0000 (+0200) Subject: Several fixes for channel AIO send and receive functions. X-Git-Url: http://git.meshlink.io/?p=meshlink;a=commitdiff_plain;h=f3e15df2c965d014a3281416e502be96170063e8;hp=1b0f134888b1d6c9acda938fb654cd4dfd295167 Several fixes for channel AIO send and receive functions. - Process multiple buffers if possible - Better handling error conditions - fd errors now cancel the AIO buffer - channel errors cancel all outstanding AIO buffers - Don't call the poll callback with a length larger than the remaining UTCP send buffer. --- diff --git a/src/meshlink.c b/src/meshlink.c index 4569c9f5..80bcc970 100644 --- a/src/meshlink.c +++ b/src/meshlink.c @@ -3467,6 +3467,15 @@ static void aio_signal(meshlink_handle_t *mesh, meshlink_channel_t *channel, mes } } +static void aio_abort(meshlink_handle_t *mesh, meshlink_channel_t *channel, meshlink_aio_buffer_t **aio) { + while(*aio) { + meshlink_aio_buffer_t *next = (*aio)->next; + aio_signal(mesh, channel, *aio); + free(*aio); + *aio = next; + } +} + static ssize_t channel_recv(struct utcp_connection *connection, const void *data, size_t len) { meshlink_channel_t *channel = connection->priv; @@ -3486,6 +3495,12 @@ static ssize_t channel_recv(struct utcp_connection *connection, const void *data size_t left = len; while(channel->aio_receive) { + if(!len) { + /* This receive callback signalled an error, abort all outstanding AIO buffers. */ + aio_abort(mesh, channel, &channel->aio_receive); + break; + } + meshlink_aio_buffer_t *aio = channel->aio_receive; size_t todo = aio->len - aio->done; @@ -3498,12 +3513,21 @@ static ssize_t channel_recv(struct utcp_connection *connection, const void *data } else { ssize_t result = write(aio->fd, p, todo); - if(result > 0) { - todo = result; + if(result <= 0) { + /* Writing to fd failed, cancel just this AIO buffer. */ + logger(mesh, MESHLINK_ERROR, "Writing to AIO fd %d failed: %s", aio->fd, strerror(errno)); + channel->aio_receive = aio->next; + aio_signal(mesh, channel, aio); + free(aio); + continue; } + + todo = result; } aio->done += todo; + p += todo; + left -= todo; if(aio->done == aio->len) { channel->aio_receive = aio->next; @@ -3511,10 +3535,7 @@ static ssize_t channel_recv(struct utcp_connection *connection, const void *data free(aio); } - p += todo; - left -= todo; - - if(!left && len) { + if(!left) { return len; } } @@ -3601,57 +3622,79 @@ static void channel_poll(struct utcp_connection *connection, size_t len) { node_t *n = channel->node; meshlink_handle_t *mesh = n->mesh; - meshlink_aio_buffer_t *aio = channel->aio_send; - if(aio) { - /* We at least one AIO buffer. Send as much as possible form the first buffer. */ - size_t left = aio->len - aio->done; + while(channel->aio_send) { + if(!len) { + /* This poll callback signalled an error, abort all outstanding AIO buffers. */ + aio_abort(mesh, channel, &channel->aio_send); + break; + } + + /* We have at least one AIO buffer. Send as much as possible from the buffers. */ + meshlink_aio_buffer_t *aio = channel->aio_send; + size_t todo = aio->len - aio->done; ssize_t sent; - if(len > left) { - len = left; + if(todo > len) { + todo = len; } if(aio->data) { - sent = utcp_send(connection, (char *)aio->data + aio->done, len); + sent = utcp_send(connection, (char *)aio->data + aio->done, todo); } else { - char buf[65536]; - size_t todo = utcp_get_sndbuf_free(connection); - - if(todo > left) { - todo = left; - } - - if(todo > sizeof(buf)) { - todo = sizeof(buf); - } - + char buf[todo]; ssize_t result = read(aio->fd, buf, todo); if(result > 0) { - sent = utcp_send(connection, buf, result); + todo = result; + sent = utcp_send(connection, buf, todo); } else { - sent = result; + /* Reading from fd failed, cancel just this AIO buffer. */ + if(result != 0) { + logger(mesh, MESHLINK_ERROR, "Reading from AIO fd %d failed: %s", aio->fd, strerror(errno)); + } + + channel->aio_send = aio->next; + aio_signal(mesh, channel, aio); + free(aio); + aio = channel->aio_send; + continue; } } - if(sent >= 0) { - aio->done += sent; + if(sent != (ssize_t)todo) { + /* We should never get a partial send at this point */ + assert(sent < 0); + + /* Sending failed, abort all outstanding AIO buffers and send a poll callback. */ + aio_abort(mesh, channel, &channel->aio_send); + len = 0; + break; } - /* If the buffer is now completely sent, call the callback and dispose of it. */ - if(aio->done >= aio->len) { - channel->aio_send = aio->next; - aio_signal(mesh, channel, aio); - free(aio); + aio->done += sent; + len -= sent; + + /* If we didn't finish this buffer, exit early. */ + if(aio->done < aio->len) { + return; } - } else { - if(channel->poll_cb) { - channel->poll_cb(mesh, channel, len); - } else { - utcp_set_poll_cb(connection, NULL); + + /* Signal completion of this buffer, and go to the next one. */ + channel->aio_send = aio->next; + aio_signal(mesh, channel, aio); + free(aio); + + if(!len) { + return; } } + + if(channel->poll_cb) { + channel->poll_cb(mesh, channel, len); + } else { + utcp_set_poll_cb(connection, NULL); + } } void meshlink_set_channel_poll_cb(meshlink_handle_t *mesh, meshlink_channel_t *channel, meshlink_channel_poll_cb_t cb) { @@ -3794,17 +3837,8 @@ void meshlink_channel_close(meshlink_handle_t *mesh, meshlink_channel_t *channel utcp_close(channel->c); /* Clean up any outstanding AIO buffers. */ - for(meshlink_aio_buffer_t *aio = channel->aio_send, *next; aio; aio = next) { - next = aio->next; - aio_signal(mesh, channel, aio); - free(aio); - } - - for(meshlink_aio_buffer_t *aio = channel->aio_receive, *next; aio; aio = next) { - next = aio->next; - aio_signal(mesh, channel, aio); - free(aio); - } + aio_abort(mesh, channel, &channel->aio_send); + aio_abort(mesh, channel, &channel->aio_receive); pthread_mutex_unlock(&mesh->mutex); @@ -3881,7 +3915,11 @@ bool meshlink_channel_aio_send(meshlink_handle_t *mesh, meshlink_channel_t *chan /* Ensure the poll callback is set, and call it right now to push data if possible */ utcp_set_poll_cb(channel->c, channel_poll); - channel_poll(channel->c, len); + size_t todo = MIN(len, utcp_get_rcvbuf_free(channel->c)); + + if(todo) { + channel_poll(channel->c, todo); + } pthread_mutex_unlock(&mesh->mutex); @@ -3918,7 +3956,11 @@ bool meshlink_channel_aio_fd_send(meshlink_handle_t *mesh, meshlink_channel_t *c /* Ensure the poll callback is set, and call it right now to push data if possible */ utcp_set_poll_cb(channel->c, channel_poll); - channel_poll(channel->c, len); + size_t left = utcp_get_rcvbuf_free(channel->c); + + if(left) { + channel_poll(channel->c, left); + } pthread_mutex_unlock(&mesh->mutex);