}
}
+static void aio_abort(meshlink_handle_t *mesh, meshlink_channel_t *channel, meshlink_aio_buffer_t **aio) {
+ while(*aio) {
+ meshlink_aio_buffer_t *next = (*aio)->next;
+ aio_signal(mesh, channel, *aio);
+ free(*aio);
+ *aio = next;
+ }
+}
+
static ssize_t channel_recv(struct utcp_connection *connection, const void *data, size_t len) {
meshlink_channel_t *channel = connection->priv;
size_t left = len;
while(channel->aio_receive) {
+ if(!len) {
+ /* This receive callback signalled an error, abort all outstanding AIO buffers. */
+ aio_abort(mesh, channel, &channel->aio_receive);
+ break;
+ }
+
meshlink_aio_buffer_t *aio = channel->aio_receive;
size_t todo = aio->len - aio->done;
if(aio) {
/* We at least one AIO buffer. Send as much as possible form the first buffer. */
size_t left = aio->len - aio->done;
+ if(!len) {
+ /* This poll callback signalled an error, abort all outstanding AIO buffers. */
+ aio_abort(mesh, channel, &channel->aio_send);
+ if(channel->poll_cb) {
+ channel->poll_cb(mesh, channel, 0);
+ } else {
+ utcp_set_poll_cb(connection, NULL);
+ }
+ return;
+ }
+
ssize_t sent;
if(len > left) {
utcp_close(channel->c);
/* Clean up any outstanding AIO buffers. */
- for(meshlink_aio_buffer_t *aio = channel->aio_send, *next; aio; aio = next) {
- next = aio->next;
- aio_signal(mesh, channel, aio);
- free(aio);
- }
-
- for(meshlink_aio_buffer_t *aio = channel->aio_receive, *next; aio; aio = next) {
- next = aio->next;
- aio_signal(mesh, channel, aio);
- free(aio);
- }
+ aio_abort(mesh, channel, &channel->aio_send);
+ aio_abort(mesh, channel, &channel->aio_receive);
pthread_mutex_unlock(&mesh->mutex);