X-Git-Url: http://git.meshlink.io/?a=blobdiff_plain;f=src%2Fmeshlink.c;h=42573c8ffa6f68837fb1e06f3a567f0baefbdbd9;hb=c023ad12147aa88810629c110ea6b1ab94267196;hp=1d33a5897ad07155c8413794a7cbc3f77373d4c4;hpb=a18da7ad8a0d9f28dabf97d15407ece748538888;p=meshlink diff --git a/src/meshlink.c b/src/meshlink.c index 1d33a589..42573c8f 100644 --- a/src/meshlink.c +++ b/src/meshlink.c @@ -2807,6 +2807,18 @@ static bool channel_pre_accept(struct utcp *utcp, uint16_t port) { return mesh->channel_accept_cb; } +static void aio_signal(meshlink_handle_t *mesh, meshlink_channel_t *channel, meshlink_aio_buffer_t *aio) { + if(aio->data) { + if(aio->cb.buffer) { + aio->cb.buffer(mesh, channel, aio->data, aio->len, aio->priv); + } + } else { + if(aio->cb.fd) { + aio->cb.fd(mesh, channel, aio->fd, aio->done, aio->priv); + } + } +} + static ssize_t channel_recv(struct utcp_connection *connection, const void *data, size_t len) { meshlink_channel_t *channel = connection->priv; @@ -2833,16 +2845,21 @@ static ssize_t channel_recv(struct utcp_connection *connection, const void *data todo = left; } - memcpy((char *)aio->data + aio->done, p, todo); + if(aio->data) { + memcpy((char *)aio->data + aio->done, p, todo); + } else { + ssize_t result = write(aio->fd, p, todo); + + if(result > 0) { + todo = result; + } + } + aio->done += todo; if(aio->done == aio->len) { channel->aio_receive = aio->next; - - if(aio->cb) { - aio->cb(mesh, channel, aio->data, aio->len, aio->priv); - } - + aio_signal(mesh, channel, aio); free(aio); } @@ -2930,12 +2947,34 @@ static void channel_poll(struct utcp_connection *connection, size_t len) { if(aio) { /* We at least one AIO buffer. Send as much as possible form the first buffer. */ size_t left = aio->len - aio->done; + ssize_t sent; if(len > left) { len = left; } - ssize_t sent = utcp_send(connection, (char *)aio->data + aio->done, len); + if(aio->data) { + sent = utcp_send(connection, (char *)aio->data + aio->done, len); + } else { + char buf[65536]; + size_t todo = utcp_get_sndbuf_free(connection); + + if(todo > left) { + todo = left; + } + + if(todo > sizeof(buf)) { + todo = sizeof(buf); + } + + ssize_t result = read(aio->fd, buf, todo); + + if(result > 0) { + sent = utcp_send(connection, buf, result); + } else { + sent = result; + } + } if(sent >= 0) { aio->done += sent; @@ -2944,11 +2983,7 @@ static void channel_poll(struct utcp_connection *connection, size_t len) { /* If the buffer is now completely sent, call the callback and dispose of it. */ if(aio->done >= aio->len) { channel->aio_send = aio->next; - - if(aio->cb) { - aio->cb(mesh, channel, aio->data, aio->len, aio->priv); - } - + aio_signal(mesh, channel, aio); free(aio); } } else { @@ -3076,21 +3111,13 @@ void meshlink_channel_close(meshlink_handle_t *mesh, meshlink_channel_t *channel /* Clean up any outstanding AIO buffers. */ for(meshlink_aio_buffer_t *aio = channel->aio_send, *next; aio; aio = next) { next = aio->next; - - if(aio->cb) { - aio->cb(mesh, channel, aio->data, aio->len, aio->priv); - } - + aio_signal(mesh, channel, aio); free(aio); } for(meshlink_aio_buffer_t *aio = channel->aio_receive, *next; aio; aio = next) { next = aio->next; - - if(aio->cb) { - aio->cb(mesh, channel, aio->data, aio->len, aio->priv); - } - + aio_signal(mesh, channel, aio); free(aio); } @@ -3151,7 +3178,44 @@ bool meshlink_channel_aio_send(meshlink_handle_t *mesh, meshlink_channel_t *chan meshlink_aio_buffer_t *aio = xzalloc(sizeof(*aio)); aio->data = data; aio->len = len; - aio->cb = cb; + aio->cb.buffer = cb; + aio->priv = priv; + + pthread_mutex_lock(&mesh->mesh_mutex); + + /* Append the AIO buffer descriptor to the end of the chain */ + meshlink_aio_buffer_t **p = &channel->aio_send; + + while(*p) { + p = &(*p)->next; + } + + *p = aio; + + /* Ensure the poll callback is set, and call it right now to push data if possible */ + utcp_set_poll_cb(channel->c, channel_poll); + channel_poll(channel->c, len); + + pthread_mutex_unlock(&mesh->mesh_mutex); + + return true; +} + +bool meshlink_channel_aio_fd_send(meshlink_handle_t *mesh, meshlink_channel_t *channel, int fd, size_t len, meshlink_aio_fd_cb_t cb, void *priv) { + if(!mesh || !channel) { + meshlink_errno = MESHLINK_EINVAL; + return false; + } + + if(!len || fd == -1) { + meshlink_errno = MESHLINK_EINVAL; + return false; + } + + meshlink_aio_buffer_t *aio = xzalloc(sizeof(*aio)); + aio->fd = fd; + aio->len = len; + aio->cb.fd = cb; aio->priv = priv; pthread_mutex_lock(&mesh->mesh_mutex); @@ -3188,7 +3252,40 @@ bool meshlink_channel_aio_receive(meshlink_handle_t *mesh, meshlink_channel_t *c meshlink_aio_buffer_t *aio = xzalloc(sizeof(*aio)); aio->data = data; aio->len = len; - aio->cb = cb; + aio->cb.buffer = cb; + aio->priv = priv; + + pthread_mutex_lock(&mesh->mesh_mutex); + + /* Append the AIO buffer descriptor to the end of the chain */ + meshlink_aio_buffer_t **p = &channel->aio_receive; + + while(*p) { + p = &(*p)->next; + } + + *p = aio; + + pthread_mutex_unlock(&mesh->mesh_mutex); + + return true; +} + +bool meshlink_channel_aio_fd_receive(meshlink_handle_t *mesh, meshlink_channel_t *channel, int fd, size_t len, meshlink_aio_fd_cb_t cb, void *priv) { + if(!mesh || !channel) { + meshlink_errno = MESHLINK_EINVAL; + return false; + } + + if(!len || fd == -1) { + meshlink_errno = MESHLINK_EINVAL; + return false; + } + + meshlink_aio_buffer_t *aio = xzalloc(sizeof(*aio)); + aio->fd = fd; + aio->len = len; + aio->cb.fd = cb; aio->priv = priv; pthread_mutex_lock(&mesh->mesh_mutex);