X-Git-Url: http://git.meshlink.io/?a=blobdiff_plain;f=src%2Fmeshlink.c;fp=src%2Fmeshlink.c;h=1b643503f6a9416d72528571697d93a0213e33dd;hb=57114d942004e8a34ff22aadc0c620a0aabbb423;hp=27fe945cd734af7d26ddfc17579a05bda74b0626;hpb=83f02487f4197e4850b54f086c0f9df3f7260297;p=meshlink diff --git a/src/meshlink.c b/src/meshlink.c index 27fe945c..1b643503 100644 --- a/src/meshlink.c +++ b/src/meshlink.c @@ -2819,8 +2819,43 @@ static ssize_t channel_recv(struct utcp_connection *connection, const void *data if(n->status.destroyed) { meshlink_channel_close(mesh, channel); - } else if(channel->receive_cb) { - channel->receive_cb(mesh, channel, data, len); + return len; + } + + const char *p = data; + size_t left = len; + + while(channel->aio_receive) { + meshlink_aio_buffer_t *aio = channel->aio_receive; + size_t todo = aio->len - aio->done; + + if(todo > left) { + todo = left; + } + + memcpy((char *)aio->data + aio->done, p, todo); + aio->done += todo; + + if(aio->done == aio->len) { + channel->aio_receive = aio->next; + + if(aio->cb) { + aio->cb(mesh, channel, aio->data, aio->len, aio->priv); + } + + free(aio); + } + + p += todo; + left -= todo; + + if(!left && len) { + return len; + } + } + + if(channel->receive_cb) { + channel->receive_cb(mesh, channel, p, left); } return len; @@ -2890,7 +2925,7 @@ static void channel_poll(struct utcp_connection *connection, size_t len) { node_t *n = channel->node; meshlink_handle_t *mesh = n->mesh; - meshlink_aio_buffer_t *aio = channel->aio; + meshlink_aio_buffer_t *aio = channel->aio_send; if(aio) { /* We at least one AIO buffer. Send as much as possible form the first buffer. */ @@ -2908,7 +2943,7 @@ static void channel_poll(struct utcp_connection *connection, size_t len) { /* If the buffer is now completely sent, call the callback and dispose of it. */ if(aio->done >= aio->len) { - channel->aio = aio->next; + channel->aio_send = aio->next; if(aio->cb) { aio->cb(mesh, channel, aio->data, aio->len, aio->priv); @@ -2928,7 +2963,7 @@ static void channel_poll(struct utcp_connection *connection, size_t len) { void meshlink_set_channel_poll_cb(meshlink_handle_t *mesh, meshlink_channel_t *channel, meshlink_channel_poll_cb_t cb) { (void)mesh; channel->poll_cb = cb; - utcp_set_poll_cb(channel->c, (cb || channel->aio) ? channel_poll : NULL); + utcp_set_poll_cb(channel->c, (cb || channel->aio_send) ? channel_poll : NULL); } void meshlink_set_channel_accept_cb(meshlink_handle_t *mesh, meshlink_channel_accept_cb_t cb) { @@ -3013,7 +3048,17 @@ void meshlink_channel_close(meshlink_handle_t *mesh, meshlink_channel_t *channel utcp_close(channel->c); /* Clean up any outstanding AIO buffers. */ - for(meshlink_aio_buffer_t *aio = channel->aio, *next; aio; aio = next) { + for(meshlink_aio_buffer_t *aio = channel->aio_send, *next; aio; aio = next) { + next = aio->next; + + if(aio->cb) { + aio->cb(mesh, channel, aio->data, aio->len, aio->priv); + } + + free(aio); + } + + for(meshlink_aio_buffer_t *aio = channel->aio_receive, *next; aio; aio = next) { next = aio->next; if(aio->cb) { @@ -3051,7 +3096,7 @@ ssize_t meshlink_channel_send(meshlink_handle_t *mesh, meshlink_channel_t *chann pthread_mutex_lock(&mesh->mesh_mutex); /* Disallow direct calls to utcp_send() while we still have AIO active. */ - if(channel->aio) { + if(channel->aio_send) { retval = 0; } else { retval = utcp_send(channel->c, data, len); @@ -3086,7 +3131,7 @@ bool meshlink_channel_aio_send(meshlink_handle_t *mesh, meshlink_channel_t *chan pthread_mutex_lock(&mesh->mesh_mutex); /* Append the AIO buffer descriptor to the end of the chain */ - meshlink_aio_buffer_t **p = &channel->aio; + meshlink_aio_buffer_t **p = &channel->aio_send; while(*p) { p = &(*p)->next; @@ -3103,6 +3148,39 @@ bool meshlink_channel_aio_send(meshlink_handle_t *mesh, meshlink_channel_t *chan return true; } +bool meshlink_channel_aio_receive(meshlink_handle_t *mesh, meshlink_channel_t *channel, const void *data, size_t len, meshlink_aio_cb_t cb, void *priv) { + if(!mesh || !channel) { + meshlink_errno = MESHLINK_EINVAL; + return false; + } + + if(!len || !data) { + meshlink_errno = MESHLINK_EINVAL; + return false; + } + + meshlink_aio_buffer_t *aio = xzalloc(sizeof(*aio)); + aio->data = data; + aio->len = len; + aio->cb = cb; + aio->priv = priv; + + pthread_mutex_lock(&mesh->mesh_mutex); + + /* Append the AIO buffer descriptor to the end of the chain */ + meshlink_aio_buffer_t **p = &channel->aio_receive; + + while(*p) { + p = &(*p)->next; + } + + *p = aio; + + pthread_mutex_unlock(&mesh->mesh_mutex); + + return true; +} + uint32_t meshlink_channel_get_flags(meshlink_handle_t *mesh, meshlink_channel_t *channel) { if(!mesh || !channel) { meshlink_errno = MESHLINK_EINVAL;