From c5fca892e8de24439222038a5dd057bcff8b69f1 Mon Sep 17 00:00:00 2001 From: Guus Sliepen Date: Fri, 29 May 2020 23:41:43 +0200 Subject: [PATCH] Add AIO send fds to the event loop if they would block. --- src/meshlink.c | 54 ++++++-- src/meshlink_internal.h | 1 + test/Makefile.am | 5 + test/channels-aio-fd-wouldblock.c | 196 ++++++++++++++++++++++++++++++ 4 files changed, 249 insertions(+), 7 deletions(-) create mode 100644 test/channels-aio-fd-wouldblock.c diff --git a/src/meshlink.c b/src/meshlink.c index 22f5220d..671b081f 100644 --- a/src/meshlink.c +++ b/src/meshlink.c @@ -3465,6 +3465,10 @@ static bool aio_finish_one(meshlink_handle_t *mesh, meshlink_channel_t *channel, meshlink_aio_buffer_t *aio = *head; *head = aio->next; + if(!aio->data && aio->io.cb) { + io_del(&mesh->loop, &aio->io); + } + if(channel->c) { channel->in_callback = true; @@ -3648,6 +3652,24 @@ static void channel_receive(meshlink_handle_t *mesh, meshlink_node_t *source, co utcp_recv(n->utcp, data, len); } +static void channel_poll(struct utcp_connection *connection, size_t len); + +static void aio_fd_poll(event_loop_t *loop, void *data, int flags) { + (void)flags; + meshlink_channel_t *channel = data; + meshlink_aio_buffer_t *aio = channel->aio_send; + assert(aio); + + io_set(loop, &aio->io, 0); + + utcp_set_poll_cb(channel->c, channel_poll); + size_t left = utcp_get_rcvbuf_free(channel->c); + + if(left) { + channel_poll(channel->c, left); + } +} + static void channel_poll(struct utcp_connection *connection, size_t len) { meshlink_channel_t *channel = connection->priv; @@ -3692,8 +3714,21 @@ static void channel_poll(struct utcp_connection *connection, size_t len) { todo = result; sent = utcp_send(connection, buf, todo); } else { - if(result < 0 && errno == EINTR) { - continue; + if(result < 0) { + if(errno == EINTR) { + continue; + } else if(errno == EAGAIN || errno == EWOULDBLOCK) { + /* The read would block, add it to the event loop. */ + utcp_set_poll_cb(connection, NULL); + + if(aio->io.cb) { + io_set(&mesh->loop, &aio->io, IO_READ); + } else { + io_add(&mesh->loop, &aio->io, aio_fd_poll, channel, aio->fd, IO_READ); + } + + return; + } } /* Reading from fd failed, cancel just this AIO buffer. */ @@ -3727,6 +3762,10 @@ static void channel_poll(struct utcp_connection *connection, size_t len) { /* If we didn't finish this buffer, exit early. */ if(aio->done < aio->len) { + if(!aio->data && len) { + continue; + } + return; } @@ -3981,6 +4020,7 @@ bool meshlink_channel_aio_send(meshlink_handle_t *mesh, meshlink_channel_t *chan return true; } + bool meshlink_channel_aio_fd_send(meshlink_handle_t *mesh, meshlink_channel_t *channel, int fd, size_t len, meshlink_aio_fd_cb_t cb, void *priv) { if(!mesh || !channel) { meshlink_errno = MESHLINK_EINVAL; @@ -4009,12 +4049,12 @@ bool meshlink_channel_aio_fd_send(meshlink_handle_t *mesh, meshlink_channel_t *c *p = aio; - /* Ensure the poll callback is set, and call it right now to push data if possible */ - utcp_set_poll_cb(channel->c, channel_poll); - size_t left = utcp_get_rcvbuf_free(channel->c); + /* Add it to the event loop if it's the head AIO buffer */ - if(left) { - channel_poll(channel->c, left); + if(p == &channel->aio_send) { + io_add(&mesh->loop, &aio->io, aio_fd_poll, channel, fd, IO_READ); + io_set(&mesh->loop, &aio->io, IO_READ); + signal_trigger(&mesh->loop, &mesh->datafromapp); } pthread_mutex_unlock(&mesh->mutex); diff --git a/src/meshlink_internal.h b/src/meshlink_internal.h index d6dc87d0..e9902743 100644 --- a/src/meshlink_internal.h +++ b/src/meshlink_internal.h @@ -209,6 +209,7 @@ struct meshlink_submesh { typedef struct meshlink_aio_buffer { const void *data; int fd; + io_t io; size_t len; size_t done; union { diff --git a/test/Makefile.am b/test/Makefile.am index 28f92560..c9168a85 100644 --- a/test/Makefile.am +++ b/test/Makefile.am @@ -6,6 +6,7 @@ TESTS = \ channels-aio \ channels-aio-cornercases \ channels-aio-fd \ + channels-aio-fd-wouldblock \ channels-cornercases \ channels-failure \ channels-fork \ @@ -40,6 +41,7 @@ check_PROGRAMS = \ channels-aio \ channels-aio-cornercases \ channels-aio-fd \ + channels-aio-fd-wouldblock \ channels-cornercases \ channels-failure \ channels-fork \ @@ -82,6 +84,9 @@ channels_aio_cornercases_LDADD = $(top_builddir)/src/libmeshlink.la channels_aio_fd_SOURCES = channels-aio-fd.c utils.c utils.h channels_aio_fd_LDADD = $(top_builddir)/src/libmeshlink.la +channels_aio_fd_wouldblock_SOURCES = channels-aio-fd-wouldblock.c utils.c utils.h +channels_aio_fd_wouldblock_LDADD = $(top_builddir)/src/libmeshlink.la + channels_no_partial_SOURCES = channels-no-partial.c utils.c utils.h channels_no_partial_LDADD = $(top_builddir)/src/libmeshlink.la diff --git a/test/channels-aio-fd-wouldblock.c b/test/channels-aio-fd-wouldblock.c new file mode 100644 index 00000000..a436ef42 --- /dev/null +++ b/test/channels-aio-fd-wouldblock.c @@ -0,0 +1,196 @@ +#ifdef NDEBUG +#undef NDEBUG +#endif + +#include +#include +#include +#include +#include + +#include "meshlink.h" +#include "utils.h" + +static size_t received; +static struct sync_flag recv_flag; +static struct sync_flag close_flag; +static struct sync_flag poll_flag; +static struct sync_flag aio_done_flag; + +static void aio_fd_cb(meshlink_handle_t *mesh, meshlink_channel_t *channel, int fd, size_t len, void *priv) { + (void)mesh; + (void)channel; + (void)fd; + (void)len; + (void)priv; + + set_sync_flag(&aio_done_flag, true); +} + +static void aio_fd_cb_ignore(meshlink_handle_t *mesh, meshlink_channel_t *channel, int fd, size_t len, void *priv) { + (void)mesh; + (void)channel; + (void)fd; + (void)len; + (void)priv; +} + +static void receive_cb(meshlink_handle_t *mesh, meshlink_channel_t *channel, const void *data, size_t len) { + (void)mesh; + (void)channel; + (void)len; + + if(!data) { + set_sync_flag(&close_flag, true); + meshlink_channel_close(mesh, channel); + } + + received += len; + set_sync_flag(&recv_flag, true); +} + +static bool accept_cb(meshlink_handle_t *mesh, meshlink_channel_t *channel, uint16_t port, const void *data, size_t len) { + (void)port; + (void)data; + (void)len; + meshlink_set_channel_receive_cb(mesh, channel, receive_cb); + return true; +} + +static void poll_cb(meshlink_handle_t *mesh, meshlink_channel_t *channel, size_t len) { + (void)len; + meshlink_set_channel_poll_cb(mesh, channel, NULL); + set_sync_flag(&poll_flag, true); +} + +int main(int argc, char *argv[]) { + (void)argc; + (void)argv; + + meshlink_set_log_cb(NULL, MESHLINK_WARNING, log_cb); + + // Open two new meshlink instance. + + meshlink_handle_t *mesh_a, *mesh_b; + open_meshlink_pair(&mesh_a, &mesh_b, "channels_aio_fd"); + + // Set the callbacks. + + meshlink_set_channel_accept_cb(mesh_b, accept_cb); + + // Start both instances + + start_meshlink_pair(mesh_a, mesh_b); + + // Open a channel from a to b. + + meshlink_node_t *b = meshlink_get_node(mesh_a, "b"); + assert(b); + + meshlink_channel_t *channel = meshlink_channel_open(mesh_a, b, 1, NULL, NULL, 0); + assert(channel); + + // Wait for the channel to be fully established + + meshlink_set_channel_poll_cb(mesh_a, channel, poll_cb); + assert(wait_sync_flag(&poll_flag, 10)); + + // Create a UNIX stream socket + + int fds[2]; + assert(socketpair(AF_UNIX, SOCK_STREAM, 0, fds) == 0); + assert(fcntl(fds[1], F_SETFL, O_NONBLOCK) == 0); + + // Enqueue 3 AIO buffers for the same fd + + assert(meshlink_channel_aio_fd_send(mesh_a, channel, fds[1], 200, aio_fd_cb, NULL)); + assert(meshlink_channel_aio_fd_send(mesh_a, channel, fds[1], 200, aio_fd_cb_ignore, NULL)); + assert(meshlink_channel_aio_fd_send(mesh_a, channel, fds[1], 200, aio_fd_cb, NULL)); + + // Fill the first buffer with two packets + + char buf[65535] = ""; + + sleep(1); + assert(write(fds[0], buf, 100) == 100); + assert(wait_sync_flag(&recv_flag, 2)); + assert(received == 100); + + sleep(1); + assert(!check_sync_flag(&aio_done_flag)); + set_sync_flag(&recv_flag, false); + assert(write(fds[0], buf, 100) == 100); + assert(wait_sync_flag(&recv_flag, 2)); + assert(received == 200); + + assert(wait_sync_flag(&aio_done_flag, 1)); + set_sync_flag(&aio_done_flag, false); + + // Fill half of the second buffer + + set_sync_flag(&recv_flag, false); + assert(write(fds[0], buf, 100) == 100); + assert(wait_sync_flag(&recv_flag, 2)); + assert(received == 300); + + // Send one packet that spans two AIO buffers + + sleep(1); + assert(!check_sync_flag(&aio_done_flag)); + assert(write(fds[0], buf, 300) == 300); + assert(wait_sync_flag(&aio_done_flag, 10)); + + // Close the channel and wait for the remaining data + + meshlink_channel_close(mesh_a, channel); + assert(wait_sync_flag(&close_flag, 10)); + assert(received == 600); + + // Create a UDP channel + + channel = meshlink_channel_open_ex(mesh_a, b, 1, NULL, NULL, 0, MESHLINK_CHANNEL_UDP); + assert(channel); + + // Wait for the channel to be fully established + + received = 0; + set_sync_flag(&poll_flag, false); + set_sync_flag(&recv_flag, false); + set_sync_flag(&close_flag, false); + meshlink_set_channel_poll_cb(mesh_a, channel, poll_cb); + assert(wait_sync_flag(&poll_flag, 10)); + + // Enqueue a huge AIO buffer + + set_sync_flag(&aio_done_flag, false); + assert(meshlink_channel_aio_fd_send(mesh_a, channel, fds[1], -1, aio_fd_cb, NULL)); + + // Send a small and a big packets + + assert(write(fds[0], buf, 100) == 100); + assert(wait_sync_flag(&recv_flag, 2)); + assert(received == 100); + + sleep(1); + assert(!check_sync_flag(&aio_done_flag)); + set_sync_flag(&recv_flag, false); + assert(write(fds[0], buf, 65535) == 65535); + assert(wait_sync_flag(&recv_flag, 2)); + assert(received == 65635); + + // Close the fds, this should terminate the AIO buffer + + sleep(1); + assert(!check_sync_flag(&aio_done_flag)); + close(fds[0]); + assert(wait_sync_flag(&aio_done_flag, 10)); + close(fds[1]); + + meshlink_channel_close(mesh_a, channel); + assert(wait_sync_flag(&close_flag, 10)); + assert(received == 65635); + + // Clean up. + + close_meshlink_pair(mesh_a, mesh_b); +} -- 2.39.5