]> git.meshlink.io Git - meshlink/commitdiff
Add AIO send fds to the event loop if they would block. feature/aio-socket-fd
authorGuus Sliepen <guus@meshlink.io>
Fri, 29 May 2020 21:41:43 +0000 (23:41 +0200)
committerGuus Sliepen <guus@meshlink.io>
Sat, 30 May 2020 21:18:23 +0000 (23:18 +0200)
src/meshlink.c
src/meshlink_internal.h
test/Makefile.am
test/channels-aio-fd-wouldblock.c [new file with mode: 0644]

index 22f5220d4beebe6c19d6612d45923602e0e78061..671b081f09542debd1e5ee3ec1b53f0ea5897ebf 100644 (file)
@@ -3465,6 +3465,10 @@ static bool aio_finish_one(meshlink_handle_t *mesh, meshlink_channel_t *channel,
        meshlink_aio_buffer_t *aio = *head;
        *head = aio->next;
 
+       if(!aio->data && aio->io.cb) {
+               io_del(&mesh->loop, &aio->io);
+       }
+
        if(channel->c) {
                channel->in_callback = true;
 
@@ -3648,6 +3652,24 @@ static void channel_receive(meshlink_handle_t *mesh, meshlink_node_t *source, co
        utcp_recv(n->utcp, data, len);
 }
 
+static void channel_poll(struct utcp_connection *connection, size_t len);
+
+static void aio_fd_poll(event_loop_t *loop, void *data, int flags) {
+       (void)flags;
+       meshlink_channel_t *channel = data;
+       meshlink_aio_buffer_t *aio = channel->aio_send;
+       assert(aio);
+
+       io_set(loop, &aio->io, 0);
+
+       utcp_set_poll_cb(channel->c, channel_poll);
+       size_t left = utcp_get_rcvbuf_free(channel->c);
+
+       if(left) {
+               channel_poll(channel->c, left);
+       }
+}
+
 static void channel_poll(struct utcp_connection *connection, size_t len) {
        meshlink_channel_t *channel = connection->priv;
 
@@ -3692,8 +3714,21 @@ static void channel_poll(struct utcp_connection *connection, size_t len) {
                                todo = result;
                                sent = utcp_send(connection, buf, todo);
                        } else {
-                               if(result < 0 && errno == EINTR) {
-                                       continue;
+                               if(result < 0) {
+                                       if(errno == EINTR) {
+                                               continue;
+                                       } else if(errno == EAGAIN || errno == EWOULDBLOCK) {
+                                               /* The read would block, add it to the event loop. */
+                                               utcp_set_poll_cb(connection, NULL);
+
+                                               if(aio->io.cb) {
+                                                       io_set(&mesh->loop, &aio->io, IO_READ);
+                                               } else {
+                                                       io_add(&mesh->loop, &aio->io, aio_fd_poll, channel, aio->fd, IO_READ);
+                                               }
+
+                                               return;
+                                       }
                                }
 
                                /* Reading from fd failed, cancel just this AIO buffer. */
@@ -3727,6 +3762,10 @@ static void channel_poll(struct utcp_connection *connection, size_t len) {
 
                /* If we didn't finish this buffer, exit early. */
                if(aio->done < aio->len) {
+                       if(!aio->data && len) {
+                               continue;
+                       }
+
                        return;
                }
 
@@ -3981,6 +4020,7 @@ bool meshlink_channel_aio_send(meshlink_handle_t *mesh, meshlink_channel_t *chan
        return true;
 }
 
+
 bool meshlink_channel_aio_fd_send(meshlink_handle_t *mesh, meshlink_channel_t *channel, int fd, size_t len, meshlink_aio_fd_cb_t cb, void *priv) {
        if(!mesh || !channel) {
                meshlink_errno = MESHLINK_EINVAL;
@@ -4009,12 +4049,12 @@ bool meshlink_channel_aio_fd_send(meshlink_handle_t *mesh, meshlink_channel_t *c
 
        *p = aio;
 
-       /* Ensure the poll callback is set, and call it right now to push data if possible */
-       utcp_set_poll_cb(channel->c, channel_poll);
-       size_t left = utcp_get_rcvbuf_free(channel->c);
+       /* Add it to the event loop if it's the head AIO buffer */
 
-       if(left) {
-               channel_poll(channel->c, left);
+       if(p == &channel->aio_send) {
+               io_add(&mesh->loop, &aio->io, aio_fd_poll, channel, fd, IO_READ);
+               io_set(&mesh->loop, &aio->io, IO_READ);
+               signal_trigger(&mesh->loop, &mesh->datafromapp);
        }
 
        pthread_mutex_unlock(&mesh->mutex);
index d6dc87d04f414d7149fdfea3e190342cb728fa15..e990274365a42e47022078e2489f289fd1c55eea 100644 (file)
@@ -209,6 +209,7 @@ struct meshlink_submesh {
 typedef struct meshlink_aio_buffer {
        const void *data;
        int fd;
+       io_t io;
        size_t len;
        size_t done;
        union {
index 28f92560071fbe800eeb29fc61fec23d9b85b3b6..c9168a855d0988d933ef79607ce04e0935c300bc 100644 (file)
@@ -6,6 +6,7 @@ TESTS = \
        channels-aio \
        channels-aio-cornercases \
        channels-aio-fd \
+       channels-aio-fd-wouldblock \
        channels-cornercases \
        channels-failure \
        channels-fork \
@@ -40,6 +41,7 @@ check_PROGRAMS = \
        channels-aio \
        channels-aio-cornercases \
        channels-aio-fd \
+       channels-aio-fd-wouldblock \
        channels-cornercases \
        channels-failure \
        channels-fork \
@@ -82,6 +84,9 @@ channels_aio_cornercases_LDADD = $(top_builddir)/src/libmeshlink.la
 channels_aio_fd_SOURCES = channels-aio-fd.c utils.c utils.h
 channels_aio_fd_LDADD = $(top_builddir)/src/libmeshlink.la
 
+channels_aio_fd_wouldblock_SOURCES = channels-aio-fd-wouldblock.c utils.c utils.h
+channels_aio_fd_wouldblock_LDADD = $(top_builddir)/src/libmeshlink.la
+
 channels_no_partial_SOURCES = channels-no-partial.c utils.c utils.h
 channels_no_partial_LDADD = $(top_builddir)/src/libmeshlink.la
 
diff --git a/test/channels-aio-fd-wouldblock.c b/test/channels-aio-fd-wouldblock.c
new file mode 100644 (file)
index 0000000..a436ef4
--- /dev/null
@@ -0,0 +1,196 @@
+#ifdef NDEBUG
+#undef NDEBUG
+#endif
+
+#include <assert.h>
+#include <stdio.h>
+#include <unistd.h>
+#include <stdlib.h>
+#include <fcntl.h>
+
+#include "meshlink.h"
+#include "utils.h"
+
+static size_t received;
+static struct sync_flag recv_flag;
+static struct sync_flag close_flag;
+static struct sync_flag poll_flag;
+static struct sync_flag aio_done_flag;
+
+static void aio_fd_cb(meshlink_handle_t *mesh, meshlink_channel_t *channel, int fd, size_t len, void *priv) {
+       (void)mesh;
+       (void)channel;
+       (void)fd;
+       (void)len;
+       (void)priv;
+
+       set_sync_flag(&aio_done_flag, true);
+}
+
+static void aio_fd_cb_ignore(meshlink_handle_t *mesh, meshlink_channel_t *channel, int fd, size_t len, void *priv) {
+       (void)mesh;
+       (void)channel;
+       (void)fd;
+       (void)len;
+       (void)priv;
+}
+
+static void receive_cb(meshlink_handle_t *mesh, meshlink_channel_t *channel, const void *data, size_t len) {
+       (void)mesh;
+       (void)channel;
+       (void)len;
+
+       if(!data) {
+               set_sync_flag(&close_flag, true);
+               meshlink_channel_close(mesh, channel);
+       }
+
+       received += len;
+       set_sync_flag(&recv_flag, true);
+}
+
+static bool accept_cb(meshlink_handle_t *mesh, meshlink_channel_t *channel, uint16_t port, const void *data, size_t len) {
+       (void)port;
+       (void)data;
+       (void)len;
+       meshlink_set_channel_receive_cb(mesh, channel, receive_cb);
+       return true;
+}
+
+static void poll_cb(meshlink_handle_t *mesh, meshlink_channel_t *channel, size_t len) {
+       (void)len;
+       meshlink_set_channel_poll_cb(mesh, channel, NULL);
+       set_sync_flag(&poll_flag, true);
+}
+
+int main(int argc, char *argv[]) {
+       (void)argc;
+       (void)argv;
+
+       meshlink_set_log_cb(NULL, MESHLINK_WARNING, log_cb);
+
+       // Open two new meshlink instance.
+
+       meshlink_handle_t *mesh_a, *mesh_b;
+       open_meshlink_pair(&mesh_a, &mesh_b, "channels_aio_fd");
+
+       // Set the callbacks.
+
+       meshlink_set_channel_accept_cb(mesh_b, accept_cb);
+
+       // Start both instances
+
+       start_meshlink_pair(mesh_a, mesh_b);
+
+       // Open a channel from a to b.
+
+       meshlink_node_t *b = meshlink_get_node(mesh_a, "b");
+       assert(b);
+
+       meshlink_channel_t *channel = meshlink_channel_open(mesh_a, b, 1, NULL, NULL, 0);
+       assert(channel);
+
+       // Wait for the channel to be fully established
+
+       meshlink_set_channel_poll_cb(mesh_a, channel, poll_cb);
+       assert(wait_sync_flag(&poll_flag, 10));
+
+       // Create a UNIX stream socket
+
+       int fds[2];
+       assert(socketpair(AF_UNIX, SOCK_STREAM, 0, fds) == 0);
+       assert(fcntl(fds[1], F_SETFL, O_NONBLOCK) == 0);
+
+       // Enqueue 3 AIO buffers for the same fd
+
+       assert(meshlink_channel_aio_fd_send(mesh_a, channel, fds[1], 200, aio_fd_cb, NULL));
+       assert(meshlink_channel_aio_fd_send(mesh_a, channel, fds[1], 200, aio_fd_cb_ignore, NULL));
+       assert(meshlink_channel_aio_fd_send(mesh_a, channel, fds[1], 200, aio_fd_cb, NULL));
+
+       // Fill the first buffer with two packets
+
+       char buf[65535] = "";
+
+       sleep(1);
+       assert(write(fds[0], buf, 100) == 100);
+       assert(wait_sync_flag(&recv_flag, 2));
+       assert(received == 100);
+
+       sleep(1);
+       assert(!check_sync_flag(&aio_done_flag));
+       set_sync_flag(&recv_flag, false);
+       assert(write(fds[0], buf, 100) == 100);
+       assert(wait_sync_flag(&recv_flag, 2));
+       assert(received == 200);
+
+       assert(wait_sync_flag(&aio_done_flag, 1));
+       set_sync_flag(&aio_done_flag, false);
+
+       // Fill half of the second buffer
+
+       set_sync_flag(&recv_flag, false);
+       assert(write(fds[0], buf, 100) == 100);
+       assert(wait_sync_flag(&recv_flag, 2));
+       assert(received == 300);
+
+       // Send one packet that spans two AIO buffers
+
+       sleep(1);
+       assert(!check_sync_flag(&aio_done_flag));
+       assert(write(fds[0], buf, 300) == 300);
+       assert(wait_sync_flag(&aio_done_flag, 10));
+
+       // Close the channel and wait for the remaining data
+
+       meshlink_channel_close(mesh_a, channel);
+       assert(wait_sync_flag(&close_flag, 10));
+       assert(received == 600);
+
+       // Create a UDP channel
+
+       channel = meshlink_channel_open_ex(mesh_a, b, 1, NULL, NULL, 0, MESHLINK_CHANNEL_UDP);
+       assert(channel);
+
+       // Wait for the channel to be fully established
+
+       received = 0;
+       set_sync_flag(&poll_flag, false);
+       set_sync_flag(&recv_flag, false);
+       set_sync_flag(&close_flag, false);
+       meshlink_set_channel_poll_cb(mesh_a, channel, poll_cb);
+       assert(wait_sync_flag(&poll_flag, 10));
+
+       // Enqueue a huge AIO buffer
+
+       set_sync_flag(&aio_done_flag, false);
+       assert(meshlink_channel_aio_fd_send(mesh_a, channel, fds[1], -1, aio_fd_cb, NULL));
+
+       // Send a small and a big packets
+
+       assert(write(fds[0], buf, 100) == 100);
+       assert(wait_sync_flag(&recv_flag, 2));
+       assert(received == 100);
+
+       sleep(1);
+       assert(!check_sync_flag(&aio_done_flag));
+       set_sync_flag(&recv_flag, false);
+       assert(write(fds[0], buf, 65535) == 65535);
+       assert(wait_sync_flag(&recv_flag, 2));
+       assert(received == 65635);
+
+       // Close the fds, this should terminate the AIO buffer
+
+       sleep(1);
+       assert(!check_sync_flag(&aio_done_flag));
+       close(fds[0]);
+       assert(wait_sync_flag(&aio_done_flag, 10));
+       close(fds[1]);
+
+       meshlink_channel_close(mesh_a, channel);
+       assert(wait_sync_flag(&close_flag, 10));
+       assert(received == 65635);
+
+       // Clean up.
+
+       close_meshlink_pair(mesh_a, mesh_b);
+}