]> git.meshlink.io Git - meshlink/commitdiff
Add meshlink_channel_aio_receive().
authorGuus Sliepen <guus@meshlink.io>
Mon, 12 Aug 2019 14:46:02 +0000 (16:46 +0200)
committerGuus Sliepen <guus@meshlink.io>
Mon, 12 Aug 2019 14:46:02 +0000 (16:46 +0200)
This function allows handing over a large buffer to MeshLink which will be
used to receive data without needing intervention from the application.
A callback is called when MeshLink has filled the buffer with the data.

src/meshlink++.h
src/meshlink.c
src/meshlink.h
src/meshlink.sym
src/meshlink_internal.h
test/channels-aio.c

index fbefcea800385a6968c9c94eef75410e5e08de5d..3666f67d827e789d724b431b4c76960ef24bda6a 100644 (file)
@@ -735,7 +735,9 @@ public:
        }
 
        /// Transmit data on a channel asynchronously
-       /** This queues data to send to the remote node.
+       /** This registers a buffer that will be used to send data to the remote node.
+        *  Multiple buffers can be registered, in which case data will be sent in the order the buffers were registered.
+        *  While there are still buffers with unsent data, the poll callback will not be called.
         *
         *  @param channel      A handle for the channel.
         *  @param data         A pointer to a buffer containing data sent by the source, or NULL if there is no data to send.
@@ -750,6 +752,24 @@ public:
                return meshlink_channel_aio_send(handle, channel, data, len, cb, priv);
        }
 
+       /// Receive data on a channel asynchronously
+       /** This registers a buffer that will be filled with incoming channel data.
+        *  Multiple buffers can be registered, in which case data will be received in the order the buffers were registered.
+        *  While there are still buffers that have not been filled, the receive callback will not be called.
+        *
+        *  @param channel      A handle for the channel.
+        *  @param data         A pointer to a buffer that will be filled with incoming data.
+        *                      After meshlink_channel_aio_receive() returns, the buffer may not be modified or freed by the application
+        *                      until the callback routine is called.
+        *  @param len          The length of the data.
+        *  @param cb           A pointer to the function which will be called when MeshLink has finished using the buffer.
+        *
+        *  @return             True if the buffer was enqueued, false otherwise.
+        */
+       bool channel_aio_receive(channel *channel, const void *data, size_t len, meshlink_aio_cb_t cb, void *priv) {
+               return meshlink_channel_aio_receive(handle, channel, data, len, cb, priv);
+       }
+
        /// Get the amount of bytes in the send buffer.
        /** This returns the amount of bytes in the send buffer.
         *  These bytes have not been received by the peer yet.
index 27fe945cd734af7d26ddfc17579a05bda74b0626..1b643503f6a9416d72528571697d93a0213e33dd 100644 (file)
@@ -2819,8 +2819,43 @@ static ssize_t channel_recv(struct utcp_connection *connection, const void *data
 
        if(n->status.destroyed) {
                meshlink_channel_close(mesh, channel);
-       } else if(channel->receive_cb) {
-               channel->receive_cb(mesh, channel, data, len);
+               return len;
+       }
+
+       const char *p = data;
+       size_t left = len;
+
+       while(channel->aio_receive) {
+               meshlink_aio_buffer_t *aio = channel->aio_receive;
+               size_t todo = aio->len - aio->done;
+
+               if(todo > left) {
+                       todo = left;
+               }
+
+               memcpy((char *)aio->data + aio->done, p, todo);
+               aio->done += todo;
+
+               if(aio->done == aio->len) {
+                       channel->aio_receive = aio->next;
+
+                       if(aio->cb) {
+                               aio->cb(mesh, channel, aio->data, aio->len, aio->priv);
+                       }
+
+                       free(aio);
+               }
+
+               p += todo;
+               left -= todo;
+
+               if(!left && len) {
+                       return len;
+               }
+       }
+
+       if(channel->receive_cb) {
+               channel->receive_cb(mesh, channel, p, left);
        }
 
        return len;
@@ -2890,7 +2925,7 @@ static void channel_poll(struct utcp_connection *connection, size_t len) {
 
        node_t *n = channel->node;
        meshlink_handle_t *mesh = n->mesh;
-       meshlink_aio_buffer_t *aio = channel->aio;
+       meshlink_aio_buffer_t *aio = channel->aio_send;
 
        if(aio) {
                /* We at least one AIO buffer. Send as much as possible form the first buffer. */
@@ -2908,7 +2943,7 @@ static void channel_poll(struct utcp_connection *connection, size_t len) {
 
                /* If the buffer is now completely sent, call the callback and dispose of it. */
                if(aio->done >= aio->len) {
-                       channel->aio = aio->next;
+                       channel->aio_send = aio->next;
 
                        if(aio->cb) {
                                aio->cb(mesh, channel, aio->data, aio->len, aio->priv);
@@ -2928,7 +2963,7 @@ static void channel_poll(struct utcp_connection *connection, size_t len) {
 void meshlink_set_channel_poll_cb(meshlink_handle_t *mesh, meshlink_channel_t *channel, meshlink_channel_poll_cb_t cb) {
        (void)mesh;
        channel->poll_cb = cb;
-       utcp_set_poll_cb(channel->c, (cb || channel->aio) ? channel_poll : NULL);
+       utcp_set_poll_cb(channel->c, (cb || channel->aio_send) ? channel_poll : NULL);
 }
 
 void meshlink_set_channel_accept_cb(meshlink_handle_t *mesh, meshlink_channel_accept_cb_t cb) {
@@ -3013,7 +3048,17 @@ void meshlink_channel_close(meshlink_handle_t *mesh, meshlink_channel_t *channel
        utcp_close(channel->c);
 
        /* Clean up any outstanding AIO buffers. */
-       for(meshlink_aio_buffer_t *aio = channel->aio, *next; aio; aio = next) {
+       for(meshlink_aio_buffer_t *aio = channel->aio_send, *next; aio; aio = next) {
+               next = aio->next;
+
+               if(aio->cb) {
+                       aio->cb(mesh, channel, aio->data, aio->len, aio->priv);
+               }
+
+               free(aio);
+       }
+
+       for(meshlink_aio_buffer_t *aio = channel->aio_receive, *next; aio; aio = next) {
                next = aio->next;
 
                if(aio->cb) {
@@ -3051,7 +3096,7 @@ ssize_t meshlink_channel_send(meshlink_handle_t *mesh, meshlink_channel_t *chann
        pthread_mutex_lock(&mesh->mesh_mutex);
 
        /* Disallow direct calls to utcp_send() while we still have AIO active. */
-       if(channel->aio) {
+       if(channel->aio_send) {
                retval = 0;
        } else {
                retval = utcp_send(channel->c, data, len);
@@ -3086,7 +3131,7 @@ bool meshlink_channel_aio_send(meshlink_handle_t *mesh, meshlink_channel_t *chan
        pthread_mutex_lock(&mesh->mesh_mutex);
 
        /* Append the AIO buffer descriptor to the end of the chain */
-       meshlink_aio_buffer_t **p = &channel->aio;
+       meshlink_aio_buffer_t **p = &channel->aio_send;
 
        while(*p) {
                p = &(*p)->next;
@@ -3103,6 +3148,39 @@ bool meshlink_channel_aio_send(meshlink_handle_t *mesh, meshlink_channel_t *chan
        return true;
 }
 
+bool meshlink_channel_aio_receive(meshlink_handle_t *mesh, meshlink_channel_t *channel, const void *data, size_t len, meshlink_aio_cb_t cb, void *priv) {
+       if(!mesh || !channel) {
+               meshlink_errno = MESHLINK_EINVAL;
+               return false;
+       }
+
+       if(!len || !data) {
+               meshlink_errno = MESHLINK_EINVAL;
+               return false;
+       }
+
+       meshlink_aio_buffer_t *aio = xzalloc(sizeof(*aio));
+       aio->data = data;
+       aio->len = len;
+       aio->cb = cb;
+       aio->priv = priv;
+
+       pthread_mutex_lock(&mesh->mesh_mutex);
+
+       /* Append the AIO buffer descriptor to the end of the chain */
+       meshlink_aio_buffer_t **p = &channel->aio_receive;
+
+       while(*p) {
+               p = &(*p)->next;
+       }
+
+       *p = aio;
+
+       pthread_mutex_unlock(&mesh->mesh_mutex);
+
+       return true;
+}
+
 uint32_t meshlink_channel_get_flags(meshlink_handle_t *mesh, meshlink_channel_t *channel) {
        if(!mesh || !channel) {
                meshlink_errno = MESHLINK_EINVAL;
index 5c5196ab7097ae5e91adb6312017d01ece5ab541..31583e510050589506e493a6afff798aa3eda015 100644 (file)
@@ -1139,7 +1139,9 @@ extern ssize_t meshlink_channel_send(meshlink_handle_t *mesh, meshlink_channel_t
 typedef void (*meshlink_aio_cb_t)(meshlink_handle_t *mesh, meshlink_channel_t *channel, const void *data, size_t len, void *priv);
 
 /// Transmit data on a channel asynchronously
-/** This queues data to send to the remote node.
+/** This registers a buffer that will be used to send data to the remote node.
+ *  Multiple buffers can be registered, in which case data will be sent in the order the buffers were registered.
+ *  While there are still buffers with unsent data, the poll callback will not be called.
  *
  *  @param mesh         A handle which represents an instance of MeshLink.
  *  @param channel      A handle for the channel.
@@ -1153,6 +1155,23 @@ typedef void (*meshlink_aio_cb_t)(meshlink_handle_t *mesh, meshlink_channel_t *c
  */
 extern bool meshlink_channel_aio_send(meshlink_handle_t *mesh, meshlink_channel_t *channel, const void *data, size_t len, meshlink_aio_cb_t cb, void *priv);
 
+/// Receive data on a channel asynchronously
+/** This registers a buffer that will be filled with incoming channel data.
+ *  Multiple buffers can be registered, in which case data will be received in the order the buffers were registered.
+ *  While there are still buffers that have not been filled, the receive callback will not be called.
+ *
+ *  @param mesh         A handle which represents an instance of MeshLink.
+ *  @param channel      A handle for the channel.
+ *  @param data         A pointer to a buffer that will be filled with incoming data.
+ *                      After meshlink_channel_aio_receive() returns, the buffer may not be modified or freed by the application
+ *                      until the callback routine is called.
+ *  @param len          The length of the data.
+ *  @param cb           A pointer to the function which will be called when MeshLink has finished using the buffer.
+ *
+ *  @return             True if the buffer was enqueued, false otherwise.
+ */
+extern bool meshlink_channel_aio_receive(meshlink_handle_t *mesh, meshlink_channel_t *channel, const void *data, size_t len, meshlink_aio_cb_t cb, void *priv);
+
 /// Get channel flags.
 /** This returns the flags used when opening this channel.
  *
index 05fb66c228a07d61c4d366b30434ade5cd428a26..33cb5a1c96ef00862aa3ef208531669bf18186e4 100644 (file)
@@ -9,6 +9,7 @@ devtool_trybind_probe
 meshlink_add_address
 meshlink_add_external_address
 meshlink_blacklist
+meshlink_channel_aio_receive
 meshlink_channel_aio_send
 meshlink_channel_close
 meshlink_channel_get_flags
index 0e3581f5142fa9354350e1efa6451f88dc6e98a7..68de758af73063d5b006c4d3fbc4d3b76458445a 100644 (file)
@@ -221,7 +221,8 @@ struct meshlink_channel {
        void *priv;
 
        struct utcp_connection *c;
-       meshlink_aio_buffer_t *aio;
+       meshlink_aio_buffer_t *aio_send;
+       meshlink_aio_buffer_t *aio_receive;
        meshlink_channel_receive_cb_t receive_cb;
        meshlink_channel_poll_cb_t poll_cb;
 };
index ca79cce955526ae6fb837507aa3d01427dece41d..ef4c0a3fd1c27fe37a53466fa6c6433fa5c77292 100644 (file)
@@ -10,6 +10,7 @@
 
 static const size_t size = 10000000; // size of data to transfer
 static bool bar_reachable = false;
+static int bar_callbacks = 0;
 static int foo_callbacks = 0;
 static size_t bar_received = 0;
 static struct sync_flag bar_finished_flag;
@@ -50,12 +51,14 @@ void foo_aio_cb(meshlink_handle_t *mesh, meshlink_channel_t *channel, const void
        foo_callbacks++;
 }
 
-void bar_receive_cb(meshlink_handle_t *mesh, meshlink_channel_t *channel, const void *data, size_t len) {
+void bar_aio_cb(meshlink_handle_t *mesh, meshlink_channel_t *channel, const void *data, size_t len, void *priv) {
+       (void)mesh;
        (void)channel;
+       (void)data;
+       (void)len;
+       (void)priv;
 
-       char *indata = mesh->priv;
-       memcpy(indata, data, len);
-       mesh->priv = indata + len;
+       bar_callbacks++;
        bar_received += len;
 
        if(bar_received >= size) {
@@ -75,12 +78,13 @@ bool reject_cb(meshlink_handle_t *mesh, meshlink_channel_t *channel, uint16_t po
 
 bool accept_cb(meshlink_handle_t *mesh, meshlink_channel_t *channel, uint16_t port, const void *data, size_t len) {
        assert(port == 7);
+       assert(!data);
+       assert(!len);
+       char *outdata = mesh->priv;
 
-       meshlink_set_channel_receive_cb(mesh, channel, bar_receive_cb);
-
-       if(data) {
-               bar_receive_cb(mesh, channel, data, len);
-       }
+       meshlink_set_channel_receive_cb(mesh, channel, NULL);
+       assert(meshlink_channel_aio_receive(mesh, channel, outdata, size / 4, bar_aio_cb, NULL));
+       assert(meshlink_channel_aio_receive(mesh, channel, outdata + size / 4, size - size / 4, bar_aio_cb, NULL));
 
        return true;
 }
@@ -173,6 +177,7 @@ int main(int argc, char *argv[]) {
        assert(wait_sync_flag(&bar_finished_flag, 10));
 
        assert(foo_callbacks == 2);
+       assert(bar_callbacks == 2);
        assert(bar_received == size);
        assert(!memcmp(indata, outdata, size));