return mesh->channel_accept_cb;
}
-static void aio_signal(meshlink_handle_t *mesh, meshlink_channel_t *channel, meshlink_aio_buffer_t *aio) {
- if(aio->data) {
- if(aio->cb.buffer) {
- aio->cb.buffer(mesh, channel, aio->data, aio->len, aio->priv);
+/* Finish one AIO buffer, return true if the channel is still open. */
+static bool aio_finish_one(meshlink_handle_t *mesh, meshlink_channel_t *channel, meshlink_aio_buffer_t **head) {
+ meshlink_aio_buffer_t *aio = *head;
+ *head = aio->next;
+
+ if(channel->c) {
+ channel->in_callback = true;
+
+ if(aio->data) {
+ if(aio->cb.buffer) {
+ aio->cb.buffer(mesh, channel, aio->data, aio->len, aio->priv);
+ }
+ } else {
+ if(aio->cb.fd) {
+ aio->cb.fd(mesh, channel, aio->fd, aio->done, aio->priv);
+ }
}
- } else {
- if(aio->cb.fd) {
- aio->cb.fd(mesh, channel, aio->fd, aio->done, aio->priv);
+
+ channel->in_callback = false;
+
+ if(!channel->c) {
+ free(aio);
+ free(channel);
+ return false;
}
}
+
+ free(aio);
+ return true;
}
-static void aio_abort(meshlink_handle_t *mesh, meshlink_channel_t *channel, meshlink_aio_buffer_t **aio) {
- while(*aio) {
- meshlink_aio_buffer_t *next = (*aio)->next;
- aio_signal(mesh, channel, *aio);
- free(*aio);
- *aio = next;
+/* Finish all AIO buffers, return true if the channel is still open. */
+static bool aio_abort(meshlink_handle_t *mesh, meshlink_channel_t *channel, meshlink_aio_buffer_t **head) {
+ while(*head) {
+ if(!aio_finish_one(mesh, channel, head)) {
+ return false;
+ }
}
+
+ return true;
}
static ssize_t channel_recv(struct utcp_connection *connection, const void *data, size_t len) {
while(channel->aio_receive) {
if(!len) {
/* This receive callback signalled an error, abort all outstanding AIO buffers. */
- aio_abort(mesh, channel, &channel->aio_receive);
+ if(!aio_abort(mesh, channel, &channel->aio_receive)) {
+ return len;
+ }
+
break;
}
if(result <= 0) {
/* Writing to fd failed, cancel just this AIO buffer. */
logger(mesh, MESHLINK_ERROR, "Writing to AIO fd %d failed: %s", aio->fd, strerror(errno));
- channel->aio_receive = aio->next;
- aio_signal(mesh, channel, aio);
- free(aio);
+
+ if(!aio_finish_one(mesh, channel, &channel->aio_receive)) {
+ return len;
+ }
+
continue;
}
left -= todo;
if(aio->done == aio->len) {
- channel->aio_receive = aio->next;
- aio_signal(mesh, channel, aio);
- free(aio);
+ if(!aio_finish_one(mesh, channel, &channel->aio_receive)) {
+ return len;
+ }
}
if(!left) {
while(channel->aio_send) {
if(!len) {
/* This poll callback signalled an error, abort all outstanding AIO buffers. */
- aio_abort(mesh, channel, &channel->aio_send);
+ if(!aio_abort(mesh, channel, &channel->aio_send)) {
+ return;
+ }
+
break;
}
logger(mesh, MESHLINK_ERROR, "Reading from AIO fd %d failed: %s", aio->fd, strerror(errno));
}
- channel->aio_send = aio->next;
- aio_signal(mesh, channel, aio);
- free(aio);
- aio = channel->aio_send;
+ if(!aio_finish_one(mesh, channel, &channel->aio_send)) {
+ return;
+ }
+
continue;
}
}
assert(sent < 0);
/* Sending failed, abort all outstanding AIO buffers and send a poll callback. */
- aio_abort(mesh, channel, &channel->aio_send);
+ if(!aio_abort(mesh, channel, &channel->aio_send)) {
+ return;
+ }
+
len = 0;
break;
}
}
/* Signal completion of this buffer, and go to the next one. */
- channel->aio_send = aio->next;
- aio_signal(mesh, channel, aio);
- free(aio);
+ if(!aio_finish_one(mesh, channel, &channel->aio_send)) {
+ return;
+ }
if(!len) {
return;
pthread_mutex_lock(&mesh->mutex);
- utcp_close(channel->c);
+ if(channel->c) {
+ utcp_close(channel->c);
+ channel->c = NULL;
- /* Clean up any outstanding AIO buffers. */
- aio_abort(mesh, channel, &channel->aio_send);
- aio_abort(mesh, channel, &channel->aio_receive);
+ /* Clean up any outstanding AIO buffers. */
+ aio_abort(mesh, channel, &channel->aio_send);
+ aio_abort(mesh, channel, &channel->aio_receive);
+ }
- pthread_mutex_unlock(&mesh->mutex);
+ if(!channel->in_callback) {
+ free(channel);
+ }
- free(channel);
+ pthread_mutex_unlock(&mesh->mutex);
}
ssize_t meshlink_channel_send(meshlink_handle_t *mesh, meshlink_channel_t *channel, const void *data, size_t len) {