]> git.meshlink.io Git - meshlink/blobdiff - src/meshlink.c
Add AIO send fds to the event loop if they would block.
[meshlink] / src / meshlink.c
index 22f5220d4beebe6c19d6612d45923602e0e78061..671b081f09542debd1e5ee3ec1b53f0ea5897ebf 100644 (file)
@@ -3465,6 +3465,10 @@ static bool aio_finish_one(meshlink_handle_t *mesh, meshlink_channel_t *channel,
        meshlink_aio_buffer_t *aio = *head;
        *head = aio->next;
 
+       if(!aio->data && aio->io.cb) {
+               io_del(&mesh->loop, &aio->io);
+       }
+
        if(channel->c) {
                channel->in_callback = true;
 
@@ -3648,6 +3652,24 @@ static void channel_receive(meshlink_handle_t *mesh, meshlink_node_t *source, co
        utcp_recv(n->utcp, data, len);
 }
 
+static void channel_poll(struct utcp_connection *connection, size_t len);
+
+static void aio_fd_poll(event_loop_t *loop, void *data, int flags) {
+       (void)flags;
+       meshlink_channel_t *channel = data;
+       meshlink_aio_buffer_t *aio = channel->aio_send;
+       assert(aio);
+
+       io_set(loop, &aio->io, 0);
+
+       utcp_set_poll_cb(channel->c, channel_poll);
+       size_t left = utcp_get_rcvbuf_free(channel->c);
+
+       if(left) {
+               channel_poll(channel->c, left);
+       }
+}
+
 static void channel_poll(struct utcp_connection *connection, size_t len) {
        meshlink_channel_t *channel = connection->priv;
 
@@ -3692,8 +3714,21 @@ static void channel_poll(struct utcp_connection *connection, size_t len) {
                                todo = result;
                                sent = utcp_send(connection, buf, todo);
                        } else {
-                               if(result < 0 && errno == EINTR) {
-                                       continue;
+                               if(result < 0) {
+                                       if(errno == EINTR) {
+                                               continue;
+                                       } else if(errno == EAGAIN || errno == EWOULDBLOCK) {
+                                               /* The read would block, add it to the event loop. */
+                                               utcp_set_poll_cb(connection, NULL);
+
+                                               if(aio->io.cb) {
+                                                       io_set(&mesh->loop, &aio->io, IO_READ);
+                                               } else {
+                                                       io_add(&mesh->loop, &aio->io, aio_fd_poll, channel, aio->fd, IO_READ);
+                                               }
+
+                                               return;
+                                       }
                                }
 
                                /* Reading from fd failed, cancel just this AIO buffer. */
@@ -3727,6 +3762,10 @@ static void channel_poll(struct utcp_connection *connection, size_t len) {
 
                /* If we didn't finish this buffer, exit early. */
                if(aio->done < aio->len) {
+                       if(!aio->data && len) {
+                               continue;
+                       }
+
                        return;
                }
 
@@ -3981,6 +4020,7 @@ bool meshlink_channel_aio_send(meshlink_handle_t *mesh, meshlink_channel_t *chan
        return true;
 }
 
+
 bool meshlink_channel_aio_fd_send(meshlink_handle_t *mesh, meshlink_channel_t *channel, int fd, size_t len, meshlink_aio_fd_cb_t cb, void *priv) {
        if(!mesh || !channel) {
                meshlink_errno = MESHLINK_EINVAL;
@@ -4009,12 +4049,12 @@ bool meshlink_channel_aio_fd_send(meshlink_handle_t *mesh, meshlink_channel_t *c
 
        *p = aio;
 
-       /* Ensure the poll callback is set, and call it right now to push data if possible */
-       utcp_set_poll_cb(channel->c, channel_poll);
-       size_t left = utcp_get_rcvbuf_free(channel->c);
+       /* Add it to the event loop if it's the head AIO buffer */
 
-       if(left) {
-               channel_poll(channel->c, left);
+       if(p == &channel->aio_send) {
+               io_add(&mesh->loop, &aio->io, aio_fd_poll, channel, fd, IO_READ);
+               io_set(&mesh->loop, &aio->io, IO_READ);
+               signal_trigger(&mesh->loop, &mesh->datafromapp);
        }
 
        pthread_mutex_unlock(&mesh->mutex);