]> git.meshlink.io Git - meshlink/commitdiff
Add meshlink_channel_aio_send().
authorGuus Sliepen <guus@meshlink.io>
Mon, 12 Aug 2019 11:43:01 +0000 (13:43 +0200)
committerGuus Sliepen <guus@meshlink.io>
Mon, 12 Aug 2019 12:01:09 +0000 (14:01 +0200)
This function allows handing over a large amount of data to MeshLink
which will be sent without needing intervention from the application.
A callback is called when MeshLink is done with the data, so the
application can call any cleanup function it needs to call.

src/meshlink++.h
src/meshlink.c
src/meshlink.h
src/meshlink.sym
src/meshlink_internal.h

index 2dcac00201f8f658903cdaef2c0b604aa45d2131..fbefcea800385a6968c9c94eef75410e5e08de5d 100644 (file)
@@ -98,6 +98,18 @@ typedef void (*channel_receive_cb_t)(mesh *mesh, channel *channel, const void *d
  */
 typedef void (*channel_poll_cb_t)(mesh *mesh, channel *channel, size_t len);
 
+/// A callback for cleaning up buffers submitted for asynchronous I/O.
+/** This callbacks signals that MeshLink has finished using this buffer.
+ *  The ownership of the buffer is now back into the application's hands.
+ *
+ *  @param mesh      A handle which represents an instance of MeshLink.
+ *  @param channel   A handle for the channel which used this buffer.
+ *  @param data      A pointer to a buffer containing the enqueued data.
+ *  @param len       The length of the buffer.
+ *  @param priv      A private pointer which was set by the application when submitting the buffer.
+ */
+typedef void (*aio_cb_t)(mesh *mesh, channel *channel, const void *data, size_t len, void *priv);
+
 /// A class describing a MeshLink node.
 class node: public meshlink_node_t {
 };
@@ -722,6 +734,22 @@ public:
                return meshlink_channel_send(handle, channel, data, len);
        }
 
+       /// Transmit data on a channel asynchronously
+       /** This queues data to send to the remote node.
+        *
+        *  @param channel      A handle for the channel.
+        *  @param data         A pointer to a buffer containing data sent by the source, or NULL if there is no data to send.
+        *                      After meshlink_channel_aio_send() returns, the buffer may not be modified or freed by the application
+        *                      until the callback routine is called.
+        *  @param len          The length of the data, or 0 if there is no data to send.
+        *  @param cb           A pointer to the function which will be called when MeshLink has finished using the buffer.
+        *
+        *  @return             True if the buffer was enqueued, false otherwise.
+        */
+       bool channel_aio_send(channel *channel, const void *data, size_t len, meshlink_aio_cb_t cb, void *priv) {
+               return meshlink_channel_aio_send(handle, channel, data, len, cb, priv);
+       }
+
        /// Get the amount of bytes in the send buffer.
        /** This returns the amount of bytes in the send buffer.
         *  These bytes have not been received by the peer yet.
index 20dad9f43caf437d15155931f2d528b3e1a30e42..27fe945cd734af7d26ddfc17579a05bda74b0626 100644 (file)
@@ -1387,7 +1387,7 @@ static void *meshlink_main_loop(void *arg) {
                }
 
 #else
-                       pthread_cond_signal(&mesh->cond);
+               pthread_cond_signal(&mesh->cond);
                return NULL;
 #endif // HAVE_SETNS
        }
@@ -2890,16 +2890,45 @@ static void channel_poll(struct utcp_connection *connection, size_t len) {
 
        node_t *n = channel->node;
        meshlink_handle_t *mesh = n->mesh;
+       meshlink_aio_buffer_t *aio = channel->aio;
+
+       if(aio) {
+               /* We at least one AIO buffer. Send as much as possible form the first buffer. */
+               size_t left = aio->len - aio->done;
+
+               if(len > left) {
+                       len = left;
+               }
+
+               ssize_t sent = utcp_send(connection, (char *)aio->data + aio->done, len);
+
+               if(sent >= 0) {
+                       aio->done += sent;
+               }
+
+               /* If the buffer is now completely sent, call the callback and dispose of it. */
+               if(aio->done >= aio->len) {
+                       channel->aio = aio->next;
+
+                       if(aio->cb) {
+                               aio->cb(mesh, channel, aio->data, aio->len, aio->priv);
+                       }
 
-       if(channel->poll_cb) {
-               channel->poll_cb(mesh, channel, len);
+                       free(aio);
+               }
+       } else {
+               if(channel->poll_cb) {
+                       channel->poll_cb(mesh, channel, len);
+               } else {
+                       utcp_set_poll_cb(connection, NULL);
+               }
        }
 }
 
 void meshlink_set_channel_poll_cb(meshlink_handle_t *mesh, meshlink_channel_t *channel, meshlink_channel_poll_cb_t cb) {
        (void)mesh;
        channel->poll_cb = cb;
-       utcp_set_poll_cb(channel->c, cb ? channel_poll : NULL);
+       utcp_set_poll_cb(channel->c, (cb || channel->aio) ? channel_poll : NULL);
 }
 
 void meshlink_set_channel_accept_cb(meshlink_handle_t *mesh, meshlink_channel_accept_cb_t cb) {
@@ -2982,6 +3011,18 @@ void meshlink_channel_close(meshlink_handle_t *mesh, meshlink_channel_t *channel
        }
 
        utcp_close(channel->c);
+
+       /* Clean up any outstanding AIO buffers. */
+       for(meshlink_aio_buffer_t *aio = channel->aio, *next; aio; aio = next) {
+               next = aio->next;
+
+               if(aio->cb) {
+                       aio->cb(mesh, channel, aio->data, aio->len, aio->priv);
+               }
+
+               free(aio);
+       }
+
        free(channel);
 }
 
@@ -3005,8 +3046,17 @@ ssize_t meshlink_channel_send(meshlink_handle_t *mesh, meshlink_channel_t *chann
        // Then, preferably only if there is room in the receiver window,
        // kick the meshlink thread to go send packets.
 
+       ssize_t retval;
+
        pthread_mutex_lock(&mesh->mesh_mutex);
-       ssize_t retval = utcp_send(channel->c, data, len);
+
+       /* Disallow direct calls to utcp_send() while we still have AIO active. */
+       if(channel->aio) {
+               retval = 0;
+       } else {
+               retval = utcp_send(channel->c, data, len);
+       }
+
        pthread_mutex_unlock(&mesh->mesh_mutex);
 
        if(retval < 0) {
@@ -3016,6 +3066,43 @@ ssize_t meshlink_channel_send(meshlink_handle_t *mesh, meshlink_channel_t *chann
        return retval;
 }
 
+bool meshlink_channel_aio_send(meshlink_handle_t *mesh, meshlink_channel_t *channel, const void *data, size_t len, meshlink_aio_cb_t cb, void *priv) {
+       if(!mesh || !channel) {
+               meshlink_errno = MESHLINK_EINVAL;
+               return false;
+       }
+
+       if(!len || !data) {
+               meshlink_errno = MESHLINK_EINVAL;
+               return false;
+       }
+
+       meshlink_aio_buffer_t *aio = xzalloc(sizeof(*aio));
+       aio->data = data;
+       aio->len = len;
+       aio->cb = cb;
+       aio->priv = priv;
+
+       pthread_mutex_lock(&mesh->mesh_mutex);
+
+       /* Append the AIO buffer descriptor to the end of the chain */
+       meshlink_aio_buffer_t **p = &channel->aio;
+
+       while(*p) {
+               p = &(*p)->next;
+       }
+
+       *p = aio;
+
+       /* Ensure the poll callback is set, and call it right now to push data if possible */
+       utcp_set_poll_cb(channel->c, channel_poll);
+       channel_poll(channel->c, len);
+
+       pthread_mutex_unlock(&mesh->mesh_mutex);
+
+       return true;
+}
+
 uint32_t meshlink_channel_get_flags(meshlink_handle_t *mesh, meshlink_channel_t *channel) {
        if(!mesh || !channel) {
                meshlink_errno = MESHLINK_EINVAL;
index 40deb5f1a04a111a0bcaf1bc4e3baa4ed5a92e38..5c5196ab7097ae5e91adb6312017d01ece5ab541 100644 (file)
@@ -1125,6 +1125,34 @@ extern void meshlink_channel_close(meshlink_handle_t *mesh, meshlink_channel_t *
  */
 extern ssize_t meshlink_channel_send(meshlink_handle_t *mesh, meshlink_channel_t *channel, const void *data, size_t len);
 
+/// A callback for cleaning up buffers submitted for asynchronous I/O.
+/** This callbacks signals that MeshLink has finished using this buffer.
+ *  The ownership of the buffer is now back into the application's hands.
+ *
+ *  @param mesh      A handle which represents an instance of MeshLink.
+ *  @param channel   A handle for the channel which used this buffer.
+ *  @param data      A pointer to a buffer containing the enqueued data.
+ *  @param len       The length of the buffer.
+ *  @param priv      A private pointer which was set by the application when submitting the buffer.
+};
+ */
+typedef void (*meshlink_aio_cb_t)(meshlink_handle_t *mesh, meshlink_channel_t *channel, const void *data, size_t len, void *priv);
+
+/// Transmit data on a channel asynchronously
+/** This queues data to send to the remote node.
+ *
+ *  @param mesh         A handle which represents an instance of MeshLink.
+ *  @param channel      A handle for the channel.
+ *  @param data         A pointer to a buffer containing data sent by the source, or NULL if there is no data to send.
+ *                      After meshlink_channel_aio_send() returns, the buffer may not be modified or freed by the application
+ *                      until the callback routine is called.
+ *  @param len          The length of the data, or 0 if there is no data to send.
+ *  @param cb           A pointer to the function which will be called when MeshLink has finished using the buffer.
+ *
+ *  @return             True if the buffer was enqueued, false otherwise.
+ */
+extern bool meshlink_channel_aio_send(meshlink_handle_t *mesh, meshlink_channel_t *channel, const void *data, size_t len, meshlink_aio_cb_t cb, void *priv);
+
 /// Get channel flags.
 /** This returns the flags used when opening this channel.
  *
index 6bf276272fd885f02d99dad16ff71f14612f33b7..05fb66c228a07d61c4d366b30434ade5cd428a26 100644 (file)
@@ -9,6 +9,7 @@ devtool_trybind_probe
 meshlink_add_address
 meshlink_add_external_address
 meshlink_blacklist
+meshlink_channel_aio_send
 meshlink_channel_close
 meshlink_channel_get_flags
 meshlink_channel_get_recvq
index 7e2955eb52ba5d096e2e9c69b8865b9dbd84c7d8..0e3581f5142fa9354350e1efa6451f88dc6e98a7 100644 (file)
@@ -205,12 +205,23 @@ struct meshlink_submesh {
        void *priv;
 };
 
+/// An AIO buffer.
+typedef struct meshlink_aio_buffer {
+       const void *data;
+       size_t len;
+       size_t done;
+       meshlink_aio_cb_t cb;
+       void *priv;
+       struct meshlink_aio_buffer *next;
+} meshlink_aio_buffer_t;
+
 /// A channel.
 struct meshlink_channel {
        struct node_t *node;
        void *priv;
 
        struct utcp_connection *c;
+       meshlink_aio_buffer_t *aio;
        meshlink_channel_receive_cb_t receive_cb;
        meshlink_channel_poll_cb_t poll_cb;
 };