summary |
shortlog |
log |
commit | commitdiff |
tree
raw |
patch |
inline | side by side (from parent 1:
346af23)
When it's called in a callback, we can't free the channel until the
function that called the callback has a chance to safely complete. This
is not a problem for regular receive and poll callbacks, but it is for AIO,
where there can be multiple outstanding AIO buffers that each need their
callback called to signal completion, and each of them could potentially
call meshlink_channel_close().
This also ensures that when the channel is explicitly closed by the
application, it will not receive any further callbacks.
return mesh->channel_accept_cb;
}
return mesh->channel_accept_cb;
}
-static void aio_signal(meshlink_handle_t *mesh, meshlink_channel_t *channel, meshlink_aio_buffer_t *aio) {
- if(aio->data) {
- if(aio->cb.buffer) {
- aio->cb.buffer(mesh, channel, aio->data, aio->len, aio->priv);
+/* Finish one AIO buffer, return true if the channel is still open. */
+static bool aio_finish_one(meshlink_handle_t *mesh, meshlink_channel_t *channel, meshlink_aio_buffer_t **head) {
+ meshlink_aio_buffer_t *aio = *head;
+ *head = aio->next;
+
+ if(channel->c) {
+ channel->in_callback = true;
+
+ if(aio->data) {
+ if(aio->cb.buffer) {
+ aio->cb.buffer(mesh, channel, aio->data, aio->len, aio->priv);
+ }
+ } else {
+ if(aio->cb.fd) {
+ aio->cb.fd(mesh, channel, aio->fd, aio->done, aio->priv);
+ }
- } else {
- if(aio->cb.fd) {
- aio->cb.fd(mesh, channel, aio->fd, aio->done, aio->priv);
+
+ channel->in_callback = false;
+
+ if(!channel->c) {
+ free(aio);
+ free(channel);
+ return false;
+
+ free(aio);
+ return true;
-static void aio_abort(meshlink_handle_t *mesh, meshlink_channel_t *channel, meshlink_aio_buffer_t **aio) {
- while(*aio) {
- meshlink_aio_buffer_t *next = (*aio)->next;
- aio_signal(mesh, channel, *aio);
- free(*aio);
- *aio = next;
+/* Finish all AIO buffers, return true if the channel is still open. */
+static bool aio_abort(meshlink_handle_t *mesh, meshlink_channel_t *channel, meshlink_aio_buffer_t **head) {
+ while(*head) {
+ if(!aio_finish_one(mesh, channel, head)) {
+ return false;
+ }
}
static ssize_t channel_recv(struct utcp_connection *connection, const void *data, size_t len) {
}
static ssize_t channel_recv(struct utcp_connection *connection, const void *data, size_t len) {
while(channel->aio_receive) {
if(!len) {
/* This receive callback signalled an error, abort all outstanding AIO buffers. */
while(channel->aio_receive) {
if(!len) {
/* This receive callback signalled an error, abort all outstanding AIO buffers. */
- aio_abort(mesh, channel, &channel->aio_receive);
+ if(!aio_abort(mesh, channel, &channel->aio_receive)) {
+ return len;
+ }
+
if(result <= 0) {
/* Writing to fd failed, cancel just this AIO buffer. */
logger(mesh, MESHLINK_ERROR, "Writing to AIO fd %d failed: %s", aio->fd, strerror(errno));
if(result <= 0) {
/* Writing to fd failed, cancel just this AIO buffer. */
logger(mesh, MESHLINK_ERROR, "Writing to AIO fd %d failed: %s", aio->fd, strerror(errno));
- channel->aio_receive = aio->next;
- aio_signal(mesh, channel, aio);
- free(aio);
+
+ if(!aio_finish_one(mesh, channel, &channel->aio_receive)) {
+ return len;
+ }
+
left -= todo;
if(aio->done == aio->len) {
left -= todo;
if(aio->done == aio->len) {
- channel->aio_receive = aio->next;
- aio_signal(mesh, channel, aio);
- free(aio);
+ if(!aio_finish_one(mesh, channel, &channel->aio_receive)) {
+ return len;
+ }
while(channel->aio_send) {
if(!len) {
/* This poll callback signalled an error, abort all outstanding AIO buffers. */
while(channel->aio_send) {
if(!len) {
/* This poll callback signalled an error, abort all outstanding AIO buffers. */
- aio_abort(mesh, channel, &channel->aio_send);
+ if(!aio_abort(mesh, channel, &channel->aio_send)) {
+ return;
+ }
+
logger(mesh, MESHLINK_ERROR, "Reading from AIO fd %d failed: %s", aio->fd, strerror(errno));
}
logger(mesh, MESHLINK_ERROR, "Reading from AIO fd %d failed: %s", aio->fd, strerror(errno));
}
- channel->aio_send = aio->next;
- aio_signal(mesh, channel, aio);
- free(aio);
- aio = channel->aio_send;
+ if(!aio_finish_one(mesh, channel, &channel->aio_send)) {
+ return;
+ }
+
assert(sent < 0);
/* Sending failed, abort all outstanding AIO buffers and send a poll callback. */
assert(sent < 0);
/* Sending failed, abort all outstanding AIO buffers and send a poll callback. */
- aio_abort(mesh, channel, &channel->aio_send);
+ if(!aio_abort(mesh, channel, &channel->aio_send)) {
+ return;
+ }
+
}
/* Signal completion of this buffer, and go to the next one. */
}
/* Signal completion of this buffer, and go to the next one. */
- channel->aio_send = aio->next;
- aio_signal(mesh, channel, aio);
- free(aio);
+ if(!aio_finish_one(mesh, channel, &channel->aio_send)) {
+ return;
+ }
pthread_mutex_lock(&mesh->mutex);
pthread_mutex_lock(&mesh->mutex);
- utcp_close(channel->c);
+ if(channel->c) {
+ utcp_close(channel->c);
+ channel->c = NULL;
- /* Clean up any outstanding AIO buffers. */
- aio_abort(mesh, channel, &channel->aio_send);
- aio_abort(mesh, channel, &channel->aio_receive);
+ /* Clean up any outstanding AIO buffers. */
+ aio_abort(mesh, channel, &channel->aio_send);
+ aio_abort(mesh, channel, &channel->aio_receive);
+ }
- pthread_mutex_unlock(&mesh->mutex);
+ if(!channel->in_callback) {
+ free(channel);
+ }
+ pthread_mutex_unlock(&mesh->mutex);
}
ssize_t meshlink_channel_send(meshlink_handle_t *mesh, meshlink_channel_t *channel, const void *data, size_t len) {
}
ssize_t meshlink_channel_send(meshlink_handle_t *mesh, meshlink_channel_t *channel, const void *data, size_t len) {
struct meshlink_channel {
struct node_t *node;
void *priv;
struct meshlink_channel {
struct node_t *node;
void *priv;
struct utcp_connection *c;
meshlink_aio_buffer_t *aio_send;
struct utcp_connection *c;
meshlink_aio_buffer_t *aio_send;