]> git.meshlink.io Git - meshlink/blobdiff - src/meshlink.c
Add support for AIO using filedescriptors.
[meshlink] / src / meshlink.c
index 1d33a5897ad07155c8413794a7cbc3f77373d4c4..42573c8ffa6f68837fb1e06f3a567f0baefbdbd9 100644 (file)
@@ -2807,6 +2807,18 @@ static bool channel_pre_accept(struct utcp *utcp, uint16_t port) {
        return mesh->channel_accept_cb;
 }
 
+static void aio_signal(meshlink_handle_t *mesh, meshlink_channel_t *channel, meshlink_aio_buffer_t *aio) {
+       if(aio->data) {
+               if(aio->cb.buffer) {
+                       aio->cb.buffer(mesh, channel, aio->data, aio->len, aio->priv);
+               }
+       } else {
+               if(aio->cb.fd) {
+                       aio->cb.fd(mesh, channel, aio->fd, aio->done, aio->priv);
+               }
+       }
+}
+
 static ssize_t channel_recv(struct utcp_connection *connection, const void *data, size_t len) {
        meshlink_channel_t *channel = connection->priv;
 
@@ -2833,16 +2845,21 @@ static ssize_t channel_recv(struct utcp_connection *connection, const void *data
                        todo = left;
                }
 
-               memcpy((char *)aio->data + aio->done, p, todo);
+               if(aio->data) {
+                       memcpy((char *)aio->data + aio->done, p, todo);
+               } else {
+                       ssize_t result = write(aio->fd, p, todo);
+
+                       if(result > 0) {
+                               todo = result;
+                       }
+               }
+
                aio->done += todo;
 
                if(aio->done == aio->len) {
                        channel->aio_receive = aio->next;
-
-                       if(aio->cb) {
-                               aio->cb(mesh, channel, aio->data, aio->len, aio->priv);
-                       }
-
+                       aio_signal(mesh, channel, aio);
                        free(aio);
                }
 
@@ -2930,12 +2947,34 @@ static void channel_poll(struct utcp_connection *connection, size_t len) {
        if(aio) {
                /* We at least one AIO buffer. Send as much as possible form the first buffer. */
                size_t left = aio->len - aio->done;
+               ssize_t sent;
 
                if(len > left) {
                        len = left;
                }
 
-               ssize_t sent = utcp_send(connection, (char *)aio->data + aio->done, len);
+               if(aio->data) {
+                       sent = utcp_send(connection, (char *)aio->data + aio->done, len);
+               } else {
+                       char buf[65536];
+                       size_t todo = utcp_get_sndbuf_free(connection);
+
+                       if(todo > left) {
+                               todo = left;
+                       }
+
+                       if(todo > sizeof(buf)) {
+                               todo = sizeof(buf);
+                       }
+
+                       ssize_t result = read(aio->fd, buf, todo);
+
+                       if(result > 0) {
+                               sent = utcp_send(connection, buf, result);
+                       } else {
+                               sent = result;
+                       }
+               }
 
                if(sent >= 0) {
                        aio->done += sent;
@@ -2944,11 +2983,7 @@ static void channel_poll(struct utcp_connection *connection, size_t len) {
                /* If the buffer is now completely sent, call the callback and dispose of it. */
                if(aio->done >= aio->len) {
                        channel->aio_send = aio->next;
-
-                       if(aio->cb) {
-                               aio->cb(mesh, channel, aio->data, aio->len, aio->priv);
-                       }
-
+                       aio_signal(mesh, channel, aio);
                        free(aio);
                }
        } else {
@@ -3076,21 +3111,13 @@ void meshlink_channel_close(meshlink_handle_t *mesh, meshlink_channel_t *channel
        /* Clean up any outstanding AIO buffers. */
        for(meshlink_aio_buffer_t *aio = channel->aio_send, *next; aio; aio = next) {
                next = aio->next;
-
-               if(aio->cb) {
-                       aio->cb(mesh, channel, aio->data, aio->len, aio->priv);
-               }
-
+               aio_signal(mesh, channel, aio);
                free(aio);
        }
 
        for(meshlink_aio_buffer_t *aio = channel->aio_receive, *next; aio; aio = next) {
                next = aio->next;
-
-               if(aio->cb) {
-                       aio->cb(mesh, channel, aio->data, aio->len, aio->priv);
-               }
-
+               aio_signal(mesh, channel, aio);
                free(aio);
        }
 
@@ -3151,7 +3178,44 @@ bool meshlink_channel_aio_send(meshlink_handle_t *mesh, meshlink_channel_t *chan
        meshlink_aio_buffer_t *aio = xzalloc(sizeof(*aio));
        aio->data = data;
        aio->len = len;
-       aio->cb = cb;
+       aio->cb.buffer = cb;
+       aio->priv = priv;
+
+       pthread_mutex_lock(&mesh->mesh_mutex);
+
+       /* Append the AIO buffer descriptor to the end of the chain */
+       meshlink_aio_buffer_t **p = &channel->aio_send;
+
+       while(*p) {
+               p = &(*p)->next;
+       }
+
+       *p = aio;
+
+       /* Ensure the poll callback is set, and call it right now to push data if possible */
+       utcp_set_poll_cb(channel->c, channel_poll);
+       channel_poll(channel->c, len);
+
+       pthread_mutex_unlock(&mesh->mesh_mutex);
+
+       return true;
+}
+
+bool meshlink_channel_aio_fd_send(meshlink_handle_t *mesh, meshlink_channel_t *channel, int fd, size_t len, meshlink_aio_fd_cb_t cb, void *priv) {
+       if(!mesh || !channel) {
+               meshlink_errno = MESHLINK_EINVAL;
+               return false;
+       }
+
+       if(!len || fd == -1) {
+               meshlink_errno = MESHLINK_EINVAL;
+               return false;
+       }
+
+       meshlink_aio_buffer_t *aio = xzalloc(sizeof(*aio));
+       aio->fd = fd;
+       aio->len = len;
+       aio->cb.fd = cb;
        aio->priv = priv;
 
        pthread_mutex_lock(&mesh->mesh_mutex);
@@ -3188,7 +3252,40 @@ bool meshlink_channel_aio_receive(meshlink_handle_t *mesh, meshlink_channel_t *c
        meshlink_aio_buffer_t *aio = xzalloc(sizeof(*aio));
        aio->data = data;
        aio->len = len;
-       aio->cb = cb;
+       aio->cb.buffer = cb;
+       aio->priv = priv;
+
+       pthread_mutex_lock(&mesh->mesh_mutex);
+
+       /* Append the AIO buffer descriptor to the end of the chain */
+       meshlink_aio_buffer_t **p = &channel->aio_receive;
+
+       while(*p) {
+               p = &(*p)->next;
+       }
+
+       *p = aio;
+
+       pthread_mutex_unlock(&mesh->mesh_mutex);
+
+       return true;
+}
+
+bool meshlink_channel_aio_fd_receive(meshlink_handle_t *mesh, meshlink_channel_t *channel, int fd, size_t len, meshlink_aio_fd_cb_t cb, void *priv) {
+       if(!mesh || !channel) {
+               meshlink_errno = MESHLINK_EINVAL;
+               return false;
+       }
+
+       if(!len || fd == -1) {
+               meshlink_errno = MESHLINK_EINVAL;
+               return false;
+       }
+
+       meshlink_aio_buffer_t *aio = xzalloc(sizeof(*aio));
+       aio->fd = fd;
+       aio->len = len;
+       aio->cb.fd = cb;
        aio->priv = priv;
 
        pthread_mutex_lock(&mesh->mesh_mutex);