meshlink_aio_buffer_t *aio = *head;
*head = aio->next;
+ if(!aio->data && aio->io.cb) {
+ io_del(&mesh->loop, &aio->io);
+ }
+
if(channel->c) {
channel->in_callback = true;
utcp_recv(n->utcp, data, len);
}
+static void channel_poll(struct utcp_connection *connection, size_t len);
+
+static void aio_fd_poll(event_loop_t *loop, void *data, int flags) {
+ (void)flags;
+ meshlink_channel_t *channel = data;
+ meshlink_aio_buffer_t *aio = channel->aio_send;
+ assert(aio);
+
+ io_set(loop, &aio->io, 0);
+
+ utcp_set_poll_cb(channel->c, channel_poll);
+ size_t left = utcp_get_rcvbuf_free(channel->c);
+
+ if(left) {
+ channel_poll(channel->c, left);
+ }
+}
+
static void channel_poll(struct utcp_connection *connection, size_t len) {
meshlink_channel_t *channel = connection->priv;
todo = result;
sent = utcp_send(connection, buf, todo);
} else {
- if(result < 0 && errno == EINTR) {
- continue;
+ if(result < 0) {
+ if(errno == EINTR) {
+ continue;
+ } else if(errno == EAGAIN || errno == EWOULDBLOCK) {
+ /* The read would block, add it to the event loop. */
+ utcp_set_poll_cb(connection, NULL);
+
+ if(aio->io.cb) {
+ io_set(&mesh->loop, &aio->io, IO_READ);
+ } else {
+ io_add(&mesh->loop, &aio->io, aio_fd_poll, channel, aio->fd, IO_READ);
+ }
+
+ return;
+ }
}
/* Reading from fd failed, cancel just this AIO buffer. */
/* If we didn't finish this buffer, exit early. */
if(aio->done < aio->len) {
+ if(!aio->data && len) {
+ continue;
+ }
+
return;
}
return true;
}
+
bool meshlink_channel_aio_fd_send(meshlink_handle_t *mesh, meshlink_channel_t *channel, int fd, size_t len, meshlink_aio_fd_cb_t cb, void *priv) {
if(!mesh || !channel) {
meshlink_errno = MESHLINK_EINVAL;
*p = aio;
- /* Ensure the poll callback is set, and call it right now to push data if possible */
- utcp_set_poll_cb(channel->c, channel_poll);
- size_t left = utcp_get_rcvbuf_free(channel->c);
+ /* Add it to the event loop if it's the head AIO buffer */
- if(left) {
- channel_poll(channel->c, left);
+ if(p == &channel->aio_send) {
+ io_add(&mesh->loop, &aio->io, aio_fd_poll, channel, fd, IO_READ);
+ io_set(&mesh->loop, &aio->io, IO_READ);
+ signal_trigger(&mesh->loop, &mesh->datafromapp);
}
pthread_mutex_unlock(&mesh->mutex);