X-Git-Url: http://git.meshlink.io/?p=meshlink;a=blobdiff_plain;f=src%2Fmeshlink.c;h=1118b8a5e6b31b5d24cda596b66117099fea661a;hp=80bcc9704480afc84c2bbffe2203db85d2a93ac3;hb=26fdd4fc9d2a2cc12b0118c3061a65ab3f3ee6c4;hpb=346af236de55f2039825eb6550bc0b662e6fc31b diff --git a/src/meshlink.c b/src/meshlink.c index 80bcc970..1118b8a5 100644 --- a/src/meshlink.c +++ b/src/meshlink.c @@ -3455,25 +3455,46 @@ static bool channel_pre_accept(struct utcp *utcp, uint16_t port) { return mesh->channel_accept_cb; } -static void aio_signal(meshlink_handle_t *mesh, meshlink_channel_t *channel, meshlink_aio_buffer_t *aio) { - if(aio->data) { - if(aio->cb.buffer) { - aio->cb.buffer(mesh, channel, aio->data, aio->len, aio->priv); +/* Finish one AIO buffer, return true if the channel is still open. */ +static bool aio_finish_one(meshlink_handle_t *mesh, meshlink_channel_t *channel, meshlink_aio_buffer_t **head) { + meshlink_aio_buffer_t *aio = *head; + *head = aio->next; + + if(channel->c) { + channel->in_callback = true; + + if(aio->data) { + if(aio->cb.buffer) { + aio->cb.buffer(mesh, channel, aio->data, aio->len, aio->priv); + } + } else { + if(aio->cb.fd) { + aio->cb.fd(mesh, channel, aio->fd, aio->done, aio->priv); + } } - } else { - if(aio->cb.fd) { - aio->cb.fd(mesh, channel, aio->fd, aio->done, aio->priv); + + channel->in_callback = false; + + if(!channel->c) { + free(aio); + free(channel); + return false; } } + + free(aio); + return true; } -static void aio_abort(meshlink_handle_t *mesh, meshlink_channel_t *channel, meshlink_aio_buffer_t **aio) { - while(*aio) { - meshlink_aio_buffer_t *next = (*aio)->next; - aio_signal(mesh, channel, *aio); - free(*aio); - *aio = next; +/* Finish all AIO buffers, return true if the channel is still open. */ +static bool aio_abort(meshlink_handle_t *mesh, meshlink_channel_t *channel, meshlink_aio_buffer_t **head) { + while(*head) { + if(!aio_finish_one(mesh, channel, head)) { + return false; + } } + + return true; } static ssize_t channel_recv(struct utcp_connection *connection, const void *data, size_t len) { @@ -3497,7 +3518,10 @@ static ssize_t channel_recv(struct utcp_connection *connection, const void *data while(channel->aio_receive) { if(!len) { /* This receive callback signalled an error, abort all outstanding AIO buffers. */ - aio_abort(mesh, channel, &channel->aio_receive); + if(!aio_abort(mesh, channel, &channel->aio_receive)) { + return len; + } + break; } @@ -3516,9 +3540,11 @@ static ssize_t channel_recv(struct utcp_connection *connection, const void *data if(result <= 0) { /* Writing to fd failed, cancel just this AIO buffer. */ logger(mesh, MESHLINK_ERROR, "Writing to AIO fd %d failed: %s", aio->fd, strerror(errno)); - channel->aio_receive = aio->next; - aio_signal(mesh, channel, aio); - free(aio); + + if(!aio_finish_one(mesh, channel, &channel->aio_receive)) { + return len; + } + continue; } @@ -3530,9 +3556,9 @@ static ssize_t channel_recv(struct utcp_connection *connection, const void *data left -= todo; if(aio->done == aio->len) { - channel->aio_receive = aio->next; - aio_signal(mesh, channel, aio); - free(aio); + if(!aio_finish_one(mesh, channel, &channel->aio_receive)) { + return len; + } } if(!left) { @@ -3626,7 +3652,10 @@ static void channel_poll(struct utcp_connection *connection, size_t len) { while(channel->aio_send) { if(!len) { /* This poll callback signalled an error, abort all outstanding AIO buffers. */ - aio_abort(mesh, channel, &channel->aio_send); + if(!aio_abort(mesh, channel, &channel->aio_send)) { + return; + } + break; } @@ -3654,10 +3683,10 @@ static void channel_poll(struct utcp_connection *connection, size_t len) { logger(mesh, MESHLINK_ERROR, "Reading from AIO fd %d failed: %s", aio->fd, strerror(errno)); } - channel->aio_send = aio->next; - aio_signal(mesh, channel, aio); - free(aio); - aio = channel->aio_send; + if(!aio_finish_one(mesh, channel, &channel->aio_send)) { + return; + } + continue; } } @@ -3667,7 +3696,10 @@ static void channel_poll(struct utcp_connection *connection, size_t len) { assert(sent < 0); /* Sending failed, abort all outstanding AIO buffers and send a poll callback. */ - aio_abort(mesh, channel, &channel->aio_send); + if(!aio_abort(mesh, channel, &channel->aio_send)) { + return; + } + len = 0; break; } @@ -3681,9 +3713,9 @@ static void channel_poll(struct utcp_connection *connection, size_t len) { } /* Signal completion of this buffer, and go to the next one. */ - channel->aio_send = aio->next; - aio_signal(mesh, channel, aio); - free(aio); + if(!aio_finish_one(mesh, channel, &channel->aio_send)) { + return; + } if(!len) { return; @@ -3834,15 +3866,20 @@ void meshlink_channel_close(meshlink_handle_t *mesh, meshlink_channel_t *channel pthread_mutex_lock(&mesh->mutex); - utcp_close(channel->c); + if(channel->c) { + utcp_close(channel->c); + channel->c = NULL; - /* Clean up any outstanding AIO buffers. */ - aio_abort(mesh, channel, &channel->aio_send); - aio_abort(mesh, channel, &channel->aio_receive); + /* Clean up any outstanding AIO buffers. */ + aio_abort(mesh, channel, &channel->aio_send); + aio_abort(mesh, channel, &channel->aio_receive); + } - pthread_mutex_unlock(&mesh->mutex); + if(!channel->in_callback) { + free(channel); + } - free(channel); + pthread_mutex_unlock(&mesh->mutex); } ssize_t meshlink_channel_send(meshlink_handle_t *mesh, meshlink_channel_t *channel, const void *data, size_t len) {