+ pthread_mutex_lock(&mesh->mutex);
+
+ node_t *n = (node_t *)node;
+
+ if(node_add_recent_address(mesh, n, (sockaddr_t *)addr)) {
+ if(!node_write_config(mesh, n)) {
+ logger(mesh, MESHLINK_DEBUG, "Could not update %s\n", n->name);
+ }
+ }
+
+ pthread_mutex_unlock(&mesh->mutex);
+ // @TODO do we want to fire off a connection attempt right away?
+}
+
+static bool channel_pre_accept(struct utcp *utcp, uint16_t port) {
+ (void)port;
+ node_t *n = utcp->priv;
+ meshlink_handle_t *mesh = n->mesh;
+ return mesh->channel_accept_cb;
+}
+
+/* Finish one AIO buffer, return true if the channel is still open. */
+static bool aio_finish_one(meshlink_handle_t *mesh, meshlink_channel_t *channel, meshlink_aio_buffer_t **head) {
+ meshlink_aio_buffer_t *aio = *head;
+ *head = aio->next;
+
+ if(channel->c) {
+ channel->in_callback = true;
+
+ if(aio->data) {
+ if(aio->cb.buffer) {
+ aio->cb.buffer(mesh, channel, aio->data, aio->len, aio->priv);
+ }
+ } else {
+ if(aio->cb.fd) {
+ aio->cb.fd(mesh, channel, aio->fd, aio->done, aio->priv);
+ }
+ }
+
+ channel->in_callback = false;
+
+ if(!channel->c) {
+ free(aio);
+ free(channel);
+ return false;
+ }
+ }
+
+ free(aio);
+ return true;
+}
+
+/* Finish all AIO buffers, return true if the channel is still open. */
+static bool aio_abort(meshlink_handle_t *mesh, meshlink_channel_t *channel, meshlink_aio_buffer_t **head) {
+ while(*head) {
+ if(!aio_finish_one(mesh, channel, head)) {
+ return false;
+ }
+ }
+
+ return true;
+}
+
+static ssize_t channel_recv(struct utcp_connection *connection, const void *data, size_t len) {
+ meshlink_channel_t *channel = connection->priv;
+
+ if(!channel) {
+ abort();
+ }
+
+ node_t *n = channel->node;
+ meshlink_handle_t *mesh = n->mesh;
+
+ if(n->status.destroyed) {
+ meshlink_channel_close(mesh, channel);
+ return len;
+ }
+
+ const char *p = data;
+ size_t left = len;
+
+ while(channel->aio_receive) {
+ if(!len) {
+ /* This receive callback signalled an error, abort all outstanding AIO buffers. */
+ if(!aio_abort(mesh, channel, &channel->aio_receive)) {
+ return len;
+ }
+
+ break;
+ }
+
+ meshlink_aio_buffer_t *aio = channel->aio_receive;
+ size_t todo = aio->len - aio->done;
+
+ if(todo > left) {
+ todo = left;
+ }
+
+ if(aio->data) {
+ memcpy((char *)aio->data + aio->done, p, todo);
+ } else {
+ ssize_t result = write(aio->fd, p, todo);
+
+ if(result <= 0) {
+ if(result < 0 && errno == EINTR) {
+ continue;
+ }
+
+ /* Writing to fd failed, cancel just this AIO buffer. */
+ logger(mesh, MESHLINK_ERROR, "Writing to AIO fd %d failed: %s", aio->fd, strerror(errno));
+
+ if(!aio_finish_one(mesh, channel, &channel->aio_receive)) {
+ return len;
+ }
+
+ continue;
+ }
+
+ todo = result;
+ }
+
+ aio->done += todo;
+ p += todo;
+ left -= todo;
+
+ if(aio->done == aio->len) {
+ if(!aio_finish_one(mesh, channel, &channel->aio_receive)) {
+ return len;
+ }
+ }
+
+ if(!left) {
+ return len;
+ }
+ }
+
+ if(channel->receive_cb) {
+ channel->receive_cb(mesh, channel, p, left);
+ }
+
+ return len;
+}
+
+static void channel_accept(struct utcp_connection *utcp_connection, uint16_t port) {
+ node_t *n = utcp_connection->utcp->priv;
+
+ if(!n) {
+ abort();
+ }
+
+ meshlink_handle_t *mesh = n->mesh;
+
+ if(!mesh->channel_accept_cb) {
+ return;
+ }
+
+ meshlink_channel_t *channel = xzalloc(sizeof(*channel));
+ channel->node = n;
+ channel->c = utcp_connection;
+
+ if(mesh->channel_accept_cb(mesh, channel, port, NULL, 0)) {
+ utcp_accept(utcp_connection, channel_recv, channel);
+ } else {
+ free(channel);
+ }
+}
+
+static void channel_retransmit(struct utcp_connection *utcp_connection) {
+ node_t *n = utcp_connection->utcp->priv;
+ meshlink_handle_t *mesh = n->mesh;
+
+ if(n->mtuprobes == 31) {
+ timeout_set(&mesh->loop, &n->mtutimeout, &(struct timespec) {
+ 0, 0
+ });
+ }
+}
+
+static ssize_t channel_send(struct utcp *utcp, const void *data, size_t len) {
+ node_t *n = utcp->priv;
+
+ if(n->status.destroyed) {
+ return -1;
+ }
+
+ meshlink_handle_t *mesh = n->mesh;
+ return meshlink_send_immediate(mesh, (meshlink_node_t *)n, data, len) ? (ssize_t)len : -1;
+}
+
+void meshlink_set_channel_receive_cb(meshlink_handle_t *mesh, meshlink_channel_t *channel, meshlink_channel_receive_cb_t cb) {
+ if(!mesh || !channel) {
+ meshlink_errno = MESHLINK_EINVAL;
+ return;
+ }
+
+ channel->receive_cb = cb;
+}
+
+static void channel_receive(meshlink_handle_t *mesh, meshlink_node_t *source, const void *data, size_t len) {
+ (void)mesh;
+ node_t *n = (node_t *)source;
+
+ if(!n->utcp) {
+ abort();
+ }
+
+ utcp_recv(n->utcp, data, len);
+}
+
+static void channel_poll(struct utcp_connection *connection, size_t len) {
+ meshlink_channel_t *channel = connection->priv;
+
+ if(!channel) {
+ abort();
+ }
+
+ node_t *n = channel->node;
+ meshlink_handle_t *mesh = n->mesh;
+
+ while(channel->aio_send) {
+ if(!len) {
+ /* This poll callback signalled an error, abort all outstanding AIO buffers. */
+ if(!aio_abort(mesh, channel, &channel->aio_send)) {
+ return;
+ }
+
+ break;
+ }
+
+ /* We have at least one AIO buffer. Send as much as possible from the buffers. */
+ meshlink_aio_buffer_t *aio = channel->aio_send;
+ size_t todo = aio->len - aio->done;
+ ssize_t sent;
+
+ if(todo > len) {
+ todo = len;
+ }
+
+ if(aio->data) {
+ sent = utcp_send(connection, (char *)aio->data + aio->done, todo);
+ } else {
+ char buf[todo];
+ ssize_t result = read(aio->fd, buf, todo);
+
+ if(result > 0) {
+ todo = result;
+ sent = utcp_send(connection, buf, todo);
+ } else {
+ if(result < 0 && errno == EINTR) {
+ continue;
+ }
+
+ /* Reading from fd failed, cancel just this AIO buffer. */
+ if(result != 0) {
+ logger(mesh, MESHLINK_ERROR, "Reading from AIO fd %d failed: %s", aio->fd, strerror(errno));
+ }
+
+ if(!aio_finish_one(mesh, channel, &channel->aio_send)) {
+ return;
+ }
+
+ continue;
+ }
+ }
+
+ if(sent != (ssize_t)todo) {
+ /* We should never get a partial send at this point */
+ assert(sent < 0);
+
+ /* Sending failed, abort all outstanding AIO buffers and send a poll callback. */
+ if(!aio_abort(mesh, channel, &channel->aio_send)) {
+ return;
+ }
+
+ len = 0;
+ break;
+ }
+
+ aio->done += sent;
+ len -= sent;
+
+ /* If we didn't finish this buffer, exit early. */
+ if(aio->done < aio->len) {
+ return;
+ }
+
+ /* Signal completion of this buffer, and go to the next one. */
+ if(!aio_finish_one(mesh, channel, &channel->aio_send)) {
+ return;
+ }
+
+ if(!len) {
+ return;
+ }
+ }
+
+ if(channel->poll_cb) {
+ channel->poll_cb(mesh, channel, len);
+ } else {
+ utcp_set_poll_cb(connection, NULL);
+ }
+}
+
+void meshlink_set_channel_poll_cb(meshlink_handle_t *mesh, meshlink_channel_t *channel, meshlink_channel_poll_cb_t cb) {
+ if(!mesh || !channel) {
+ meshlink_errno = MESHLINK_EINVAL;
+ return;
+ }
+
+ pthread_mutex_lock(&mesh->mutex);
+ channel->poll_cb = cb;
+ utcp_set_poll_cb(channel->c, (cb || channel->aio_send) ? channel_poll : NULL);
+ pthread_mutex_unlock(&mesh->mutex);
+}
+
+void meshlink_set_channel_accept_cb(meshlink_handle_t *mesh, meshlink_channel_accept_cb_t cb) {
+ if(!mesh) {
+ meshlink_errno = MESHLINK_EINVAL;
+ return;
+ }
+
+ pthread_mutex_lock(&mesh->mutex);