X-Git-Url: http://git.meshlink.io/?a=blobdiff_plain;f=src%2Fmeshlink.c;h=42573c8ffa6f68837fb1e06f3a567f0baefbdbd9;hb=c023ad12147aa88810629c110ea6b1ab94267196;hp=20dad9f43caf437d15155931f2d528b3e1a30e42;hpb=dceaa0b7940464df45104e02b88e0ea3283bb938;p=meshlink diff --git a/src/meshlink.c b/src/meshlink.c index 20dad9f4..42573c8f 100644 --- a/src/meshlink.c +++ b/src/meshlink.c @@ -1387,7 +1387,7 @@ static void *meshlink_main_loop(void *arg) { } #else - pthread_cond_signal(&mesh->cond); + pthread_cond_signal(&mesh->cond); return NULL; #endif // HAVE_SETNS } @@ -2807,6 +2807,18 @@ static bool channel_pre_accept(struct utcp *utcp, uint16_t port) { return mesh->channel_accept_cb; } +static void aio_signal(meshlink_handle_t *mesh, meshlink_channel_t *channel, meshlink_aio_buffer_t *aio) { + if(aio->data) { + if(aio->cb.buffer) { + aio->cb.buffer(mesh, channel, aio->data, aio->len, aio->priv); + } + } else { + if(aio->cb.fd) { + aio->cb.fd(mesh, channel, aio->fd, aio->done, aio->priv); + } + } +} + static ssize_t channel_recv(struct utcp_connection *connection, const void *data, size_t len) { meshlink_channel_t *channel = connection->priv; @@ -2819,8 +2831,48 @@ static ssize_t channel_recv(struct utcp_connection *connection, const void *data if(n->status.destroyed) { meshlink_channel_close(mesh, channel); - } else if(channel->receive_cb) { - channel->receive_cb(mesh, channel, data, len); + return len; + } + + const char *p = data; + size_t left = len; + + while(channel->aio_receive) { + meshlink_aio_buffer_t *aio = channel->aio_receive; + size_t todo = aio->len - aio->done; + + if(todo > left) { + todo = left; + } + + if(aio->data) { + memcpy((char *)aio->data + aio->done, p, todo); + } else { + ssize_t result = write(aio->fd, p, todo); + + if(result > 0) { + todo = result; + } + } + + aio->done += todo; + + if(aio->done == aio->len) { + channel->aio_receive = aio->next; + aio_signal(mesh, channel, aio); + free(aio); + } + + p += todo; + left -= todo; + + if(!left && len) { + return len; + } + } + + if(channel->receive_cb) { + channel->receive_cb(mesh, channel, p, left); } return len; @@ -2890,16 +2942,63 @@ static void channel_poll(struct utcp_connection *connection, size_t len) { node_t *n = channel->node; meshlink_handle_t *mesh = n->mesh; + meshlink_aio_buffer_t *aio = channel->aio_send; + + if(aio) { + /* We at least one AIO buffer. Send as much as possible form the first buffer. */ + size_t left = aio->len - aio->done; + ssize_t sent; + + if(len > left) { + len = left; + } + + if(aio->data) { + sent = utcp_send(connection, (char *)aio->data + aio->done, len); + } else { + char buf[65536]; + size_t todo = utcp_get_sndbuf_free(connection); + + if(todo > left) { + todo = left; + } + + if(todo > sizeof(buf)) { + todo = sizeof(buf); + } + + ssize_t result = read(aio->fd, buf, todo); + + if(result > 0) { + sent = utcp_send(connection, buf, result); + } else { + sent = result; + } + } - if(channel->poll_cb) { - channel->poll_cb(mesh, channel, len); + if(sent >= 0) { + aio->done += sent; + } + + /* If the buffer is now completely sent, call the callback and dispose of it. */ + if(aio->done >= aio->len) { + channel->aio_send = aio->next; + aio_signal(mesh, channel, aio); + free(aio); + } + } else { + if(channel->poll_cb) { + channel->poll_cb(mesh, channel, len); + } else { + utcp_set_poll_cb(connection, NULL); + } } } void meshlink_set_channel_poll_cb(meshlink_handle_t *mesh, meshlink_channel_t *channel, meshlink_channel_poll_cb_t cb) { (void)mesh; channel->poll_cb = cb; - utcp_set_poll_cb(channel->c, cb ? channel_poll : NULL); + utcp_set_poll_cb(channel->c, (cb || channel->aio_send) ? channel_poll : NULL); } void meshlink_set_channel_accept_cb(meshlink_handle_t *mesh, meshlink_channel_accept_cb_t cb) { @@ -2921,6 +3020,32 @@ void meshlink_set_channel_accept_cb(meshlink_handle_t *mesh, meshlink_channel_ac pthread_mutex_unlock(&mesh->mesh_mutex); } +void meshlink_set_channel_sndbuf(meshlink_handle_t *mesh, meshlink_channel_t *channel, size_t size) { + (void)mesh; + + if(!channel) { + meshlink_errno = MESHLINK_EINVAL; + return; + } + + pthread_mutex_lock(&mesh->mesh_mutex); + utcp_set_sndbuf(channel->c, size); + pthread_mutex_unlock(&mesh->mesh_mutex); +} + +void meshlink_set_channel_rcvbuf(meshlink_handle_t *mesh, meshlink_channel_t *channel, size_t size) { + (void)mesh; + + if(!channel) { + meshlink_errno = MESHLINK_EINVAL; + return; + } + + pthread_mutex_lock(&mesh->mesh_mutex); + utcp_set_rcvbuf(channel->c, size); + pthread_mutex_unlock(&mesh->mesh_mutex); +} + meshlink_channel_t *meshlink_channel_open_ex(meshlink_handle_t *mesh, meshlink_node_t *node, uint16_t port, meshlink_channel_receive_cb_t cb, const void *data, size_t len, uint32_t flags) { if(data || len) { abort(); // TODO: handle non-NULL data @@ -2982,6 +3107,20 @@ void meshlink_channel_close(meshlink_handle_t *mesh, meshlink_channel_t *channel } utcp_close(channel->c); + + /* Clean up any outstanding AIO buffers. */ + for(meshlink_aio_buffer_t *aio = channel->aio_send, *next; aio; aio = next) { + next = aio->next; + aio_signal(mesh, channel, aio); + free(aio); + } + + for(meshlink_aio_buffer_t *aio = channel->aio_receive, *next; aio; aio = next) { + next = aio->next; + aio_signal(mesh, channel, aio); + free(aio); + } + free(channel); } @@ -3005,8 +3144,17 @@ ssize_t meshlink_channel_send(meshlink_handle_t *mesh, meshlink_channel_t *chann // Then, preferably only if there is room in the receiver window, // kick the meshlink thread to go send packets. + ssize_t retval; + pthread_mutex_lock(&mesh->mesh_mutex); - ssize_t retval = utcp_send(channel->c, data, len); + + /* Disallow direct calls to utcp_send() while we still have AIO active. */ + if(channel->aio_send) { + retval = 0; + } else { + retval = utcp_send(channel->c, data, len); + } + pthread_mutex_unlock(&mesh->mesh_mutex); if(retval < 0) { @@ -3016,6 +3164,146 @@ ssize_t meshlink_channel_send(meshlink_handle_t *mesh, meshlink_channel_t *chann return retval; } +bool meshlink_channel_aio_send(meshlink_handle_t *mesh, meshlink_channel_t *channel, const void *data, size_t len, meshlink_aio_cb_t cb, void *priv) { + if(!mesh || !channel) { + meshlink_errno = MESHLINK_EINVAL; + return false; + } + + if(!len || !data) { + meshlink_errno = MESHLINK_EINVAL; + return false; + } + + meshlink_aio_buffer_t *aio = xzalloc(sizeof(*aio)); + aio->data = data; + aio->len = len; + aio->cb.buffer = cb; + aio->priv = priv; + + pthread_mutex_lock(&mesh->mesh_mutex); + + /* Append the AIO buffer descriptor to the end of the chain */ + meshlink_aio_buffer_t **p = &channel->aio_send; + + while(*p) { + p = &(*p)->next; + } + + *p = aio; + + /* Ensure the poll callback is set, and call it right now to push data if possible */ + utcp_set_poll_cb(channel->c, channel_poll); + channel_poll(channel->c, len); + + pthread_mutex_unlock(&mesh->mesh_mutex); + + return true; +} + +bool meshlink_channel_aio_fd_send(meshlink_handle_t *mesh, meshlink_channel_t *channel, int fd, size_t len, meshlink_aio_fd_cb_t cb, void *priv) { + if(!mesh || !channel) { + meshlink_errno = MESHLINK_EINVAL; + return false; + } + + if(!len || fd == -1) { + meshlink_errno = MESHLINK_EINVAL; + return false; + } + + meshlink_aio_buffer_t *aio = xzalloc(sizeof(*aio)); + aio->fd = fd; + aio->len = len; + aio->cb.fd = cb; + aio->priv = priv; + + pthread_mutex_lock(&mesh->mesh_mutex); + + /* Append the AIO buffer descriptor to the end of the chain */ + meshlink_aio_buffer_t **p = &channel->aio_send; + + while(*p) { + p = &(*p)->next; + } + + *p = aio; + + /* Ensure the poll callback is set, and call it right now to push data if possible */ + utcp_set_poll_cb(channel->c, channel_poll); + channel_poll(channel->c, len); + + pthread_mutex_unlock(&mesh->mesh_mutex); + + return true; +} + +bool meshlink_channel_aio_receive(meshlink_handle_t *mesh, meshlink_channel_t *channel, const void *data, size_t len, meshlink_aio_cb_t cb, void *priv) { + if(!mesh || !channel) { + meshlink_errno = MESHLINK_EINVAL; + return false; + } + + if(!len || !data) { + meshlink_errno = MESHLINK_EINVAL; + return false; + } + + meshlink_aio_buffer_t *aio = xzalloc(sizeof(*aio)); + aio->data = data; + aio->len = len; + aio->cb.buffer = cb; + aio->priv = priv; + + pthread_mutex_lock(&mesh->mesh_mutex); + + /* Append the AIO buffer descriptor to the end of the chain */ + meshlink_aio_buffer_t **p = &channel->aio_receive; + + while(*p) { + p = &(*p)->next; + } + + *p = aio; + + pthread_mutex_unlock(&mesh->mesh_mutex); + + return true; +} + +bool meshlink_channel_aio_fd_receive(meshlink_handle_t *mesh, meshlink_channel_t *channel, int fd, size_t len, meshlink_aio_fd_cb_t cb, void *priv) { + if(!mesh || !channel) { + meshlink_errno = MESHLINK_EINVAL; + return false; + } + + if(!len || fd == -1) { + meshlink_errno = MESHLINK_EINVAL; + return false; + } + + meshlink_aio_buffer_t *aio = xzalloc(sizeof(*aio)); + aio->fd = fd; + aio->len = len; + aio->cb.fd = cb; + aio->priv = priv; + + pthread_mutex_lock(&mesh->mesh_mutex); + + /* Append the AIO buffer descriptor to the end of the chain */ + meshlink_aio_buffer_t **p = &channel->aio_receive; + + while(*p) { + p = &(*p)->next; + } + + *p = aio; + + pthread_mutex_unlock(&mesh->mesh_mutex); + + return true; +} + uint32_t meshlink_channel_get_flags(meshlink_handle_t *mesh, meshlink_channel_t *channel) { if(!mesh || !channel) { meshlink_errno = MESHLINK_EINVAL;