}
#else
- pthread_cond_signal(&mesh->cond);
+ pthread_cond_signal(&mesh->cond);
return NULL;
#endif // HAVE_SETNS
}
node_t *n = channel->node;
meshlink_handle_t *mesh = n->mesh;
+ meshlink_aio_buffer_t *aio = channel->aio;
+
+ if(aio) {
+ /* We at least one AIO buffer. Send as much as possible form the first buffer. */
+ size_t left = aio->len - aio->done;
+
+ if(len > left) {
+ len = left;
+ }
+
+ ssize_t sent = utcp_send(connection, (char *)aio->data + aio->done, len);
+
+ if(sent >= 0) {
+ aio->done += sent;
+ }
+
+ /* If the buffer is now completely sent, call the callback and dispose of it. */
+ if(aio->done >= aio->len) {
+ channel->aio = aio->next;
+
+ if(aio->cb) {
+ aio->cb(mesh, channel, aio->data, aio->len, aio->priv);
+ }
- if(channel->poll_cb) {
- channel->poll_cb(mesh, channel, len);
+ free(aio);
+ }
+ } else {
+ if(channel->poll_cb) {
+ channel->poll_cb(mesh, channel, len);
+ } else {
+ utcp_set_poll_cb(connection, NULL);
+ }
}
}
void meshlink_set_channel_poll_cb(meshlink_handle_t *mesh, meshlink_channel_t *channel, meshlink_channel_poll_cb_t cb) {
(void)mesh;
channel->poll_cb = cb;
- utcp_set_poll_cb(channel->c, cb ? channel_poll : NULL);
+ utcp_set_poll_cb(channel->c, (cb || channel->aio) ? channel_poll : NULL);
}
void meshlink_set_channel_accept_cb(meshlink_handle_t *mesh, meshlink_channel_accept_cb_t cb) {
}
utcp_close(channel->c);
+
+ /* Clean up any outstanding AIO buffers. */
+ for(meshlink_aio_buffer_t *aio = channel->aio, *next; aio; aio = next) {
+ next = aio->next;
+
+ if(aio->cb) {
+ aio->cb(mesh, channel, aio->data, aio->len, aio->priv);
+ }
+
+ free(aio);
+ }
+
free(channel);
}
// Then, preferably only if there is room in the receiver window,
// kick the meshlink thread to go send packets.
+ ssize_t retval;
+
pthread_mutex_lock(&mesh->mesh_mutex);
- ssize_t retval = utcp_send(channel->c, data, len);
+
+ /* Disallow direct calls to utcp_send() while we still have AIO active. */
+ if(channel->aio) {
+ retval = 0;
+ } else {
+ retval = utcp_send(channel->c, data, len);
+ }
+
pthread_mutex_unlock(&mesh->mesh_mutex);
if(retval < 0) {
return retval;
}
+bool meshlink_channel_aio_send(meshlink_handle_t *mesh, meshlink_channel_t *channel, const void *data, size_t len, meshlink_aio_cb_t cb, void *priv) {
+ if(!mesh || !channel) {
+ meshlink_errno = MESHLINK_EINVAL;
+ return false;
+ }
+
+ if(!len || !data) {
+ meshlink_errno = MESHLINK_EINVAL;
+ return false;
+ }
+
+ meshlink_aio_buffer_t *aio = xzalloc(sizeof(*aio));
+ aio->data = data;
+ aio->len = len;
+ aio->cb = cb;
+ aio->priv = priv;
+
+ pthread_mutex_lock(&mesh->mesh_mutex);
+
+ /* Append the AIO buffer descriptor to the end of the chain */
+ meshlink_aio_buffer_t **p = &channel->aio;
+
+ while(*p) {
+ p = &(*p)->next;
+ }
+
+ *p = aio;
+
+ /* Ensure the poll callback is set, and call it right now to push data if possible */
+ utcp_set_poll_cb(channel->c, channel_poll);
+ channel_poll(channel->c, len);
+
+ pthread_mutex_unlock(&mesh->mesh_mutex);
+
+ return true;
+}
+
uint32_t meshlink_channel_get_flags(meshlink_handle_t *mesh, meshlink_channel_t *channel) {
if(!mesh || !channel) {
meshlink_errno = MESHLINK_EINVAL;