}
/// Transmit data on a channel asynchronously
- /** This queues data to send to the remote node.
+ /** This registers a buffer that will be used to send data to the remote node.
+ * Multiple buffers can be registered, in which case data will be sent in the order the buffers were registered.
+ * While there are still buffers with unsent data, the poll callback will not be called.
*
* @param channel A handle for the channel.
* @param data A pointer to a buffer containing data sent by the source, or NULL if there is no data to send.
return meshlink_channel_aio_send(handle, channel, data, len, cb, priv);
}
+ /// Receive data on a channel asynchronously
+ /** This registers a buffer that will be filled with incoming channel data.
+ * Multiple buffers can be registered, in which case data will be received in the order the buffers were registered.
+ * While there are still buffers that have not been filled, the receive callback will not be called.
+ *
+ * @param channel A handle for the channel.
+ * @param data A pointer to a buffer that will be filled with incoming data.
+ * After meshlink_channel_aio_receive() returns, the buffer may not be modified or freed by the application
+ * until the callback routine is called.
+ * @param len The length of the data.
+ * @param cb A pointer to the function which will be called when MeshLink has finished using the buffer.
+ *
+ * @return True if the buffer was enqueued, false otherwise.
+ */
+ bool channel_aio_receive(channel *channel, const void *data, size_t len, meshlink_aio_cb_t cb, void *priv) {
+ return meshlink_channel_aio_receive(handle, channel, data, len, cb, priv);
+ }
+
/// Get the amount of bytes in the send buffer.
/** This returns the amount of bytes in the send buffer.
* These bytes have not been received by the peer yet.
if(n->status.destroyed) {
meshlink_channel_close(mesh, channel);
- } else if(channel->receive_cb) {
- channel->receive_cb(mesh, channel, data, len);
+ return len;
+ }
+
+ const char *p = data;
+ size_t left = len;
+
+ while(channel->aio_receive) {
+ meshlink_aio_buffer_t *aio = channel->aio_receive;
+ size_t todo = aio->len - aio->done;
+
+ if(todo > left) {
+ todo = left;
+ }
+
+ memcpy((char *)aio->data + aio->done, p, todo);
+ aio->done += todo;
+
+ if(aio->done == aio->len) {
+ channel->aio_receive = aio->next;
+
+ if(aio->cb) {
+ aio->cb(mesh, channel, aio->data, aio->len, aio->priv);
+ }
+
+ free(aio);
+ }
+
+ p += todo;
+ left -= todo;
+
+ if(!left && len) {
+ return len;
+ }
+ }
+
+ if(channel->receive_cb) {
+ channel->receive_cb(mesh, channel, p, left);
}
return len;
node_t *n = channel->node;
meshlink_handle_t *mesh = n->mesh;
- meshlink_aio_buffer_t *aio = channel->aio;
+ meshlink_aio_buffer_t *aio = channel->aio_send;
if(aio) {
/* We at least one AIO buffer. Send as much as possible form the first buffer. */
/* If the buffer is now completely sent, call the callback and dispose of it. */
if(aio->done >= aio->len) {
- channel->aio = aio->next;
+ channel->aio_send = aio->next;
if(aio->cb) {
aio->cb(mesh, channel, aio->data, aio->len, aio->priv);
void meshlink_set_channel_poll_cb(meshlink_handle_t *mesh, meshlink_channel_t *channel, meshlink_channel_poll_cb_t cb) {
(void)mesh;
channel->poll_cb = cb;
- utcp_set_poll_cb(channel->c, (cb || channel->aio) ? channel_poll : NULL);
+ utcp_set_poll_cb(channel->c, (cb || channel->aio_send) ? channel_poll : NULL);
}
void meshlink_set_channel_accept_cb(meshlink_handle_t *mesh, meshlink_channel_accept_cb_t cb) {
utcp_close(channel->c);
/* Clean up any outstanding AIO buffers. */
- for(meshlink_aio_buffer_t *aio = channel->aio, *next; aio; aio = next) {
+ for(meshlink_aio_buffer_t *aio = channel->aio_send, *next; aio; aio = next) {
+ next = aio->next;
+
+ if(aio->cb) {
+ aio->cb(mesh, channel, aio->data, aio->len, aio->priv);
+ }
+
+ free(aio);
+ }
+
+ for(meshlink_aio_buffer_t *aio = channel->aio_receive, *next; aio; aio = next) {
next = aio->next;
if(aio->cb) {
pthread_mutex_lock(&mesh->mesh_mutex);
/* Disallow direct calls to utcp_send() while we still have AIO active. */
- if(channel->aio) {
+ if(channel->aio_send) {
retval = 0;
} else {
retval = utcp_send(channel->c, data, len);
pthread_mutex_lock(&mesh->mesh_mutex);
/* Append the AIO buffer descriptor to the end of the chain */
- meshlink_aio_buffer_t **p = &channel->aio;
+ meshlink_aio_buffer_t **p = &channel->aio_send;
while(*p) {
p = &(*p)->next;
return true;
}
+bool meshlink_channel_aio_receive(meshlink_handle_t *mesh, meshlink_channel_t *channel, const void *data, size_t len, meshlink_aio_cb_t cb, void *priv) {
+ if(!mesh || !channel) {
+ meshlink_errno = MESHLINK_EINVAL;
+ return false;
+ }
+
+ if(!len || !data) {
+ meshlink_errno = MESHLINK_EINVAL;
+ return false;
+ }
+
+ meshlink_aio_buffer_t *aio = xzalloc(sizeof(*aio));
+ aio->data = data;
+ aio->len = len;
+ aio->cb = cb;
+ aio->priv = priv;
+
+ pthread_mutex_lock(&mesh->mesh_mutex);
+
+ /* Append the AIO buffer descriptor to the end of the chain */
+ meshlink_aio_buffer_t **p = &channel->aio_receive;
+
+ while(*p) {
+ p = &(*p)->next;
+ }
+
+ *p = aio;
+
+ pthread_mutex_unlock(&mesh->mesh_mutex);
+
+ return true;
+}
+
uint32_t meshlink_channel_get_flags(meshlink_handle_t *mesh, meshlink_channel_t *channel) {
if(!mesh || !channel) {
meshlink_errno = MESHLINK_EINVAL;
typedef void (*meshlink_aio_cb_t)(meshlink_handle_t *mesh, meshlink_channel_t *channel, const void *data, size_t len, void *priv);
/// Transmit data on a channel asynchronously
-/** This queues data to send to the remote node.
+/** This registers a buffer that will be used to send data to the remote node.
+ * Multiple buffers can be registered, in which case data will be sent in the order the buffers were registered.
+ * While there are still buffers with unsent data, the poll callback will not be called.
*
* @param mesh A handle which represents an instance of MeshLink.
* @param channel A handle for the channel.
*/
extern bool meshlink_channel_aio_send(meshlink_handle_t *mesh, meshlink_channel_t *channel, const void *data, size_t len, meshlink_aio_cb_t cb, void *priv);
+/// Receive data on a channel asynchronously
+/** This registers a buffer that will be filled with incoming channel data.
+ * Multiple buffers can be registered, in which case data will be received in the order the buffers were registered.
+ * While there are still buffers that have not been filled, the receive callback will not be called.
+ *
+ * @param mesh A handle which represents an instance of MeshLink.
+ * @param channel A handle for the channel.
+ * @param data A pointer to a buffer that will be filled with incoming data.
+ * After meshlink_channel_aio_receive() returns, the buffer may not be modified or freed by the application
+ * until the callback routine is called.
+ * @param len The length of the data.
+ * @param cb A pointer to the function which will be called when MeshLink has finished using the buffer.
+ *
+ * @return True if the buffer was enqueued, false otherwise.
+ */
+extern bool meshlink_channel_aio_receive(meshlink_handle_t *mesh, meshlink_channel_t *channel, const void *data, size_t len, meshlink_aio_cb_t cb, void *priv);
+
/// Get channel flags.
/** This returns the flags used when opening this channel.
*
meshlink_add_address
meshlink_add_external_address
meshlink_blacklist
+meshlink_channel_aio_receive
meshlink_channel_aio_send
meshlink_channel_close
meshlink_channel_get_flags
void *priv;
struct utcp_connection *c;
- meshlink_aio_buffer_t *aio;
+ meshlink_aio_buffer_t *aio_send;
+ meshlink_aio_buffer_t *aio_receive;
meshlink_channel_receive_cb_t receive_cb;
meshlink_channel_poll_cb_t poll_cb;
};
static const size_t size = 10000000; // size of data to transfer
static bool bar_reachable = false;
+static int bar_callbacks = 0;
static int foo_callbacks = 0;
static size_t bar_received = 0;
static struct sync_flag bar_finished_flag;
foo_callbacks++;
}
-void bar_receive_cb(meshlink_handle_t *mesh, meshlink_channel_t *channel, const void *data, size_t len) {
+void bar_aio_cb(meshlink_handle_t *mesh, meshlink_channel_t *channel, const void *data, size_t len, void *priv) {
+ (void)mesh;
(void)channel;
+ (void)data;
+ (void)len;
+ (void)priv;
- char *indata = mesh->priv;
- memcpy(indata, data, len);
- mesh->priv = indata + len;
+ bar_callbacks++;
bar_received += len;
if(bar_received >= size) {
bool accept_cb(meshlink_handle_t *mesh, meshlink_channel_t *channel, uint16_t port, const void *data, size_t len) {
assert(port == 7);
+ assert(!data);
+ assert(!len);
+ char *outdata = mesh->priv;
- meshlink_set_channel_receive_cb(mesh, channel, bar_receive_cb);
-
- if(data) {
- bar_receive_cb(mesh, channel, data, len);
- }
+ meshlink_set_channel_receive_cb(mesh, channel, NULL);
+ assert(meshlink_channel_aio_receive(mesh, channel, outdata, size / 4, bar_aio_cb, NULL));
+ assert(meshlink_channel_aio_receive(mesh, channel, outdata + size / 4, size - size / 4, bar_aio_cb, NULL));
return true;
}
assert(wait_sync_flag(&bar_finished_flag, 10));
assert(foo_callbacks == 2);
+ assert(bar_callbacks == 2);
assert(bar_received == size);
assert(!memcmp(indata, outdata, size));