+ closesocket(state.sock);
+
+ pthread_mutex_unlock(&mesh->mutex);
+ return true;
+
+invalid:
+ logger(mesh, MESHLINK_DEBUG, "Invalid invitation URL\n");
+ meshlink_errno = MESHLINK_EINVAL;
+exit:
+ sptps_stop(&state.sptps);
+ ecdsa_free(hiskey);
+ ecdsa_free(key);
+
+ if(state.sock != -1) {
+ closesocket(state.sock);
+ }
+
+ pthread_mutex_unlock(&mesh->mutex);
+ return false;
+}
+
+char *meshlink_export(meshlink_handle_t *mesh) {
+ if(!mesh) {
+ meshlink_errno = MESHLINK_EINVAL;
+ return NULL;
+ }
+
+ // Create a config file on the fly.
+
+ uint8_t buf[4096];
+ packmsg_output_t out = {buf, sizeof(buf)};
+ packmsg_add_uint32(&out, MESHLINK_CONFIG_VERSION);
+ packmsg_add_str(&out, mesh->name);
+ packmsg_add_str(&out, CORE_MESH);
+
+ pthread_mutex_lock(&mesh->mutex);
+
+ packmsg_add_int32(&out, mesh->self->devclass);
+ packmsg_add_bool(&out, mesh->self->status.blacklisted);
+ packmsg_add_bin(&out, ecdsa_get_public_key(mesh->private_key), 32);
+
+ if(mesh->self->canonical_address && !strchr(mesh->self->canonical_address, ' ')) {
+ char *canonical_address = NULL;
+ xasprintf(&canonical_address, "%s %s", mesh->self->canonical_address, mesh->myport);
+ packmsg_add_str(&out, canonical_address);
+ free(canonical_address);
+ } else {
+ packmsg_add_str(&out, mesh->self->canonical_address ? mesh->self->canonical_address : "");
+ }
+
+ uint32_t count = 0;
+
+ for(uint32_t i = 0; i < MAX_RECENT; i++) {
+ if(mesh->self->recent[i].sa.sa_family) {
+ count++;
+ } else {
+ break;
+ }
+ }
+
+ packmsg_add_array(&out, count);
+
+ for(uint32_t i = 0; i < count; i++) {
+ packmsg_add_sockaddr(&out, &mesh->self->recent[i]);
+ }
+
+ packmsg_add_int64(&out, 0);
+ packmsg_add_int64(&out, 0);
+
+ pthread_mutex_unlock(&mesh->mutex);
+
+ if(!packmsg_output_ok(&out)) {
+ logger(mesh, MESHLINK_DEBUG, "Error creating export data\n");
+ meshlink_errno = MESHLINK_EINTERNAL;
+ return NULL;
+ }
+
+ // Prepare a base64-encoded packmsg array containing our config file
+
+ uint32_t len = packmsg_output_size(&out, buf);
+ uint32_t len2 = ((len + 4) * 4) / 3 + 4;
+ uint8_t *buf2 = xmalloc(len2);
+ packmsg_output_t out2 = {buf2, len2};
+ packmsg_add_array(&out2, 1);
+ packmsg_add_bin(&out2, buf, packmsg_output_size(&out, buf));
+
+ if(!packmsg_output_ok(&out2)) {
+ logger(mesh, MESHLINK_DEBUG, "Error creating export data\n");
+ meshlink_errno = MESHLINK_EINTERNAL;
+ free(buf2);
+ return NULL;
+ }
+
+ b64encode_urlsafe(buf2, (char *)buf2, packmsg_output_size(&out2, buf2));
+
+ return (char *)buf2;
+}
+
+bool meshlink_import(meshlink_handle_t *mesh, const char *data) {
+ if(!mesh || !data) {
+ meshlink_errno = MESHLINK_EINVAL;
+ return false;
+ }
+
+ size_t datalen = strlen(data);
+ uint8_t *buf = xmalloc(datalen);
+ int buflen = b64decode(data, buf, datalen);
+
+ if(!buflen) {
+ logger(mesh, MESHLINK_DEBUG, "Invalid data\n");
+ meshlink_errno = MESHLINK_EPEER;
+ return false;
+ }
+
+ packmsg_input_t in = {buf, buflen};
+ uint32_t count = packmsg_get_array(&in);
+
+ if(!count) {
+ logger(mesh, MESHLINK_DEBUG, "Invalid data\n");
+ meshlink_errno = MESHLINK_EPEER;
+ return false;
+ }
+
+ pthread_mutex_lock(&mesh->mutex);
+
+ while(count--) {
+ const void *data2;
+ uint32_t len2 = packmsg_get_bin_raw(&in, &data2);
+
+ if(!len2) {
+ break;
+ }
+
+ packmsg_input_t in2 = {data2, len2};
+ uint32_t version = packmsg_get_uint32(&in2);
+ char *name = packmsg_get_str_dup(&in2);
+
+ if(!packmsg_input_ok(&in2) || version != MESHLINK_CONFIG_VERSION || !check_id(name)) {
+ free(name);
+ packmsg_input_invalidate(&in);
+ break;
+ }
+
+ if(!check_id(name)) {
+ free(name);
+ break;
+ }
+
+ node_t *n = lookup_node(mesh, name);
+
+ if(n) {
+ logger(mesh, MESHLINK_DEBUG, "Node %s already exists, not importing\n", name);
+ free(name);
+ continue;
+ }
+
+ n = new_node();
+ n->name = name;
+
+ config_t config = {data2, len2};
+
+ if(!node_read_from_config(mesh, n, &config)) {
+ free_node(n);
+ packmsg_input_invalidate(&in);
+ break;
+ }
+
+ /* Clear the reachability times, since we ourself have never seen these nodes yet */
+ n->last_reachable = 0;
+ n->last_unreachable = 0;
+
+ if(!node_write_config(mesh, n)) {
+ free_node(n);
+ return false;
+ }
+
+ node_add(mesh, n);
+ }
+
+ pthread_mutex_unlock(&mesh->mutex);
+
+ free(buf);
+
+ if(!packmsg_done(&in)) {
+ logger(mesh, MESHLINK_ERROR, "Invalid data\n");
+ meshlink_errno = MESHLINK_EPEER;
+ return false;
+ }
+
+ if(!config_sync(mesh, "current")) {
+ return false;
+ }
+
+ return true;
+}
+
+static bool blacklist(meshlink_handle_t *mesh, node_t *n) {
+ if(n == mesh->self) {
+ logger(mesh, MESHLINK_ERROR, "%s blacklisting itself?\n", n->name);
+ meshlink_errno = MESHLINK_EINVAL;
+ return false;
+ }
+
+ if(n->status.blacklisted) {
+ logger(mesh, MESHLINK_DEBUG, "Node %s already blacklisted\n", n->name);
+ return true;
+ }
+
+ n->status.blacklisted = true;
+
+ /* Immediately shut down any connections we have with the blacklisted node.
+ * We can't call terminate_connection(), because we might be called from a callback function.
+ */
+ for list_each(connection_t, c, mesh->connections) {
+ if(c->node == n) {
+ shutdown(c->socket, SHUT_RDWR);
+ }
+ }
+
+ utcp_abort_all_connections(n->utcp);
+
+ n->mtu = 0;
+ n->minmtu = 0;
+ n->maxmtu = MTU;
+ n->mtuprobes = 0;
+ n->status.udp_confirmed = false;
+
+ if(n->status.reachable) {
+ n->last_unreachable = time(NULL);
+ }
+
+ /* Graph updates will suppress status updates for blacklisted nodes, so we need to
+ * manually call the status callback if necessary.
+ */
+ if(n->status.reachable && mesh->node_status_cb) {
+ mesh->node_status_cb(mesh, (meshlink_node_t *)n, false);
+ }
+
+ return node_write_config(mesh, n) && config_sync(mesh, "current");
+}
+
+bool meshlink_blacklist(meshlink_handle_t *mesh, meshlink_node_t *node) {
+ if(!mesh || !node) {
+ meshlink_errno = MESHLINK_EINVAL;
+ return false;
+ }
+
+ pthread_mutex_lock(&mesh->mutex);
+
+ if(!blacklist(mesh, (node_t *)node)) {
+ pthread_mutex_unlock(&mesh->mutex);
+ return false;
+ }
+
+ pthread_mutex_unlock(&mesh->mutex);
+
+ logger(mesh, MESHLINK_DEBUG, "Blacklisted %s.\n", node->name);
+ return true;
+}
+
+bool meshlink_blacklist_by_name(meshlink_handle_t *mesh, const char *name) {
+ if(!mesh || !name) {
+ meshlink_errno = MESHLINK_EINVAL;
+ return false;
+ }
+
+ pthread_mutex_lock(&mesh->mutex);
+
+ node_t *n = lookup_node(mesh, (char *)name);
+
+ if(!n) {
+ n = new_node();
+ n->name = xstrdup(name);
+ node_add(mesh, n);
+ }
+
+ if(!blacklist(mesh, (node_t *)n)) {
+ pthread_mutex_unlock(&mesh->mutex);
+ return false;
+ }
+
+ pthread_mutex_unlock(&mesh->mutex);
+
+ logger(mesh, MESHLINK_DEBUG, "Blacklisted %s.\n", name);
+ return true;
+}
+
+static bool whitelist(meshlink_handle_t *mesh, node_t *n) {
+ if(n == mesh->self) {
+ logger(mesh, MESHLINK_ERROR, "%s whitelisting itself?\n", n->name);
+ meshlink_errno = MESHLINK_EINVAL;
+ return false;
+ }
+
+ if(!n->status.blacklisted) {
+ logger(mesh, MESHLINK_DEBUG, "Node %s was already whitelisted\n", n->name);
+ return true;
+ }
+
+ n->status.blacklisted = false;
+
+ if(n->status.reachable) {
+ n->last_reachable = time(NULL);
+ update_node_status(mesh, n);
+ }
+
+ return node_write_config(mesh, n) && config_sync(mesh, "current");
+}
+
+bool meshlink_whitelist(meshlink_handle_t *mesh, meshlink_node_t *node) {
+ if(!mesh || !node) {
+ meshlink_errno = MESHLINK_EINVAL;
+ return false;
+ }
+
+ pthread_mutex_lock(&mesh->mutex);
+
+ if(!whitelist(mesh, (node_t *)node)) {
+ pthread_mutex_unlock(&mesh->mutex);
+ return false;
+ }
+
+ pthread_mutex_unlock(&mesh->mutex);
+
+ logger(mesh, MESHLINK_DEBUG, "Whitelisted %s.\n", node->name);
+ return true;
+}
+
+bool meshlink_whitelist_by_name(meshlink_handle_t *mesh, const char *name) {
+ if(!mesh || !name) {
+ meshlink_errno = MESHLINK_EINVAL;
+ return false;
+ }
+
+ pthread_mutex_lock(&mesh->mutex);
+
+ node_t *n = lookup_node(mesh, (char *)name);
+
+ if(!n) {
+ n = new_node();
+ n->name = xstrdup(name);
+ node_add(mesh, n);
+ }
+
+ if(!whitelist(mesh, (node_t *)n)) {
+ pthread_mutex_unlock(&mesh->mutex);
+ return false;
+ }
+
+ pthread_mutex_unlock(&mesh->mutex);
+
+ logger(mesh, MESHLINK_DEBUG, "Whitelisted %s.\n", name);
+ return true;
+}
+
+void meshlink_set_default_blacklist(meshlink_handle_t *mesh, bool blacklist) {
+ mesh->default_blacklist = blacklist;
+}
+
+bool meshlink_forget_node(meshlink_handle_t *mesh, meshlink_node_t *node) {
+ if(!mesh || !node) {
+ meshlink_errno = MESHLINK_EINVAL;
+ return false;
+ }
+
+ node_t *n = (node_t *)node;
+
+ pthread_mutex_lock(&mesh->mutex);
+
+ /* Check that the node is not reachable */
+ if(n->status.reachable || n->connection) {
+ pthread_mutex_unlock(&mesh->mutex);
+ logger(mesh, MESHLINK_WARNING, "Could not forget %s: still reachable", n->name);
+ return false;
+ }
+
+ /* Check that we don't have any active UTCP connections */
+ if(n->utcp && utcp_is_active(n->utcp)) {
+ pthread_mutex_unlock(&mesh->mutex);
+ logger(mesh, MESHLINK_WARNING, "Could not forget %s: active UTCP connections", n->name);
+ return false;
+ }
+
+ /* Check that we have no active connections to this node */
+ for list_each(connection_t, c, mesh->connections) {
+ if(c->node == n) {
+ pthread_mutex_unlock(&mesh->mutex);
+ logger(mesh, MESHLINK_WARNING, "Could not forget %s: active connection", n->name);
+ return false;
+ }
+ }
+
+ /* Remove any pending outgoings to this node */
+ if(mesh->outgoings) {
+ for list_each(outgoing_t, outgoing, mesh->outgoings) {
+ if(outgoing->node == n) {
+ list_delete_node(mesh->outgoings, list_node);
+ }
+ }
+ }
+
+ /* Delete the config file for this node */
+ if(!config_delete(mesh, "current", n->name)) {
+ pthread_mutex_unlock(&mesh->mutex);
+ return false;
+ }
+
+ /* Delete the node struct and any remaining edges referencing this node */
+ node_del(mesh, n);
+
+ pthread_mutex_unlock(&mesh->mutex);
+
+ return config_sync(mesh, "current");
+}
+
+/* Hint that a hostname may be found at an address
+ * See header file for detailed comment.
+ */
+void meshlink_hint_address(meshlink_handle_t *mesh, meshlink_node_t *node, const struct sockaddr *addr) {
+ if(!mesh || !node || !addr) {
+ meshlink_errno = EINVAL;
+ return;
+ }
+
+ pthread_mutex_lock(&mesh->mutex);
+
+ node_t *n = (node_t *)node;
+
+ if(node_add_recent_address(mesh, n, (sockaddr_t *)addr)) {
+ if(!node_write_config(mesh, n)) {
+ logger(mesh, MESHLINK_DEBUG, "Could not update %s\n", n->name);
+ }
+ }
+
+ pthread_mutex_unlock(&mesh->mutex);
+ // @TODO do we want to fire off a connection attempt right away?
+}
+
+static bool channel_pre_accept(struct utcp *utcp, uint16_t port) {
+ (void)port;
+ node_t *n = utcp->priv;
+ meshlink_handle_t *mesh = n->mesh;
+ return mesh->channel_accept_cb;
+}
+
+/* Finish one AIO buffer, return true if the channel is still open. */
+static bool aio_finish_one(meshlink_handle_t *mesh, meshlink_channel_t *channel, meshlink_aio_buffer_t **head) {
+ meshlink_aio_buffer_t *aio = *head;
+ *head = aio->next;
+
+ if(channel->c) {
+ channel->in_callback = true;
+
+ if(aio->data) {
+ if(aio->cb.buffer) {
+ aio->cb.buffer(mesh, channel, aio->data, aio->done, aio->priv);
+ }
+ } else {
+ if(aio->cb.fd) {
+ aio->cb.fd(mesh, channel, aio->fd, aio->done, aio->priv);
+ }
+ }
+
+ channel->in_callback = false;
+
+ if(!channel->c) {
+ free(aio);
+ free(channel);
+ return false;
+ }
+ }
+
+ free(aio);
+ return true;
+}
+
+/* Finish all AIO buffers, return true if the channel is still open. */
+static bool aio_abort(meshlink_handle_t *mesh, meshlink_channel_t *channel, meshlink_aio_buffer_t **head) {
+ while(*head) {
+ if(!aio_finish_one(mesh, channel, head)) {
+ return false;
+ }
+ }
+
+ return true;
+}
+
+static ssize_t channel_recv(struct utcp_connection *connection, const void *data, size_t len) {
+ meshlink_channel_t *channel = connection->priv;
+
+ if(!channel) {
+ abort();
+ }
+
+ node_t *n = channel->node;
+ meshlink_handle_t *mesh = n->mesh;
+
+ if(n->status.destroyed) {
+ meshlink_channel_close(mesh, channel);
+ return len;
+ }
+
+ const char *p = data;
+ size_t left = len;
+
+ while(channel->aio_receive) {
+ if(!len) {
+ /* This receive callback signalled an error, abort all outstanding AIO buffers. */
+ if(!aio_abort(mesh, channel, &channel->aio_receive)) {
+ return len;
+ }
+
+ break;
+ }
+
+ meshlink_aio_buffer_t *aio = channel->aio_receive;
+ size_t todo = aio->len - aio->done;
+
+ if(todo > left) {
+ todo = left;
+ }
+
+ if(aio->data) {
+ memcpy((char *)aio->data + aio->done, p, todo);
+ } else {
+ ssize_t result = write(aio->fd, p, todo);
+
+ if(result <= 0) {
+ if(result < 0 && errno == EINTR) {
+ continue;
+ }
+
+ /* Writing to fd failed, cancel just this AIO buffer. */
+ logger(mesh, MESHLINK_ERROR, "Writing to AIO fd %d failed: %s", aio->fd, strerror(errno));
+
+ if(!aio_finish_one(mesh, channel, &channel->aio_receive)) {
+ return len;
+ }
+
+ continue;
+ }
+
+ todo = result;
+ }
+
+ aio->done += todo;
+ p += todo;
+ left -= todo;
+
+ if(aio->done == aio->len) {
+ if(!aio_finish_one(mesh, channel, &channel->aio_receive)) {
+ return len;
+ }
+ }
+
+ if(!left) {
+ return len;
+ }
+ }
+
+ if(channel->receive_cb) {
+ channel->receive_cb(mesh, channel, p, left);
+ }
+
+ return len;
+}
+
+static void channel_accept(struct utcp_connection *utcp_connection, uint16_t port) {
+ node_t *n = utcp_connection->utcp->priv;
+
+ if(!n) {
+ abort();
+ }
+
+ meshlink_handle_t *mesh = n->mesh;
+
+ if(!mesh->channel_accept_cb) {
+ return;
+ }
+
+ meshlink_channel_t *channel = xzalloc(sizeof(*channel));
+ channel->node = n;
+ channel->c = utcp_connection;
+
+ if(mesh->channel_accept_cb(mesh, channel, port, NULL, 0)) {
+ utcp_accept(utcp_connection, channel_recv, channel);
+ } else {
+ free(channel);
+ }
+}
+
+static void channel_retransmit(struct utcp_connection *utcp_connection) {
+ node_t *n = utcp_connection->utcp->priv;
+ meshlink_handle_t *mesh = n->mesh;
+
+ if(n->mtuprobes == 31) {
+ timeout_set(&mesh->loop, &n->mtutimeout, &(struct timespec) {
+ 0, 0
+ });
+ }
+}
+
+static ssize_t channel_send(struct utcp *utcp, const void *data, size_t len) {
+ node_t *n = utcp->priv;
+
+ if(n->status.destroyed) {
+ return -1;
+ }
+
+ meshlink_handle_t *mesh = n->mesh;
+ return meshlink_send_immediate(mesh, (meshlink_node_t *)n, data, len) ? (ssize_t)len : -1;
+}
+
+void meshlink_set_channel_receive_cb(meshlink_handle_t *mesh, meshlink_channel_t *channel, meshlink_channel_receive_cb_t cb) {
+ if(!mesh || !channel) {
+ meshlink_errno = MESHLINK_EINVAL;
+ return;
+ }
+
+ channel->receive_cb = cb;
+}
+
+static void channel_receive(meshlink_handle_t *mesh, meshlink_node_t *source, const void *data, size_t len) {
+ (void)mesh;
+ node_t *n = (node_t *)source;
+
+ if(!n->utcp) {
+ abort();
+ }
+
+ utcp_recv(n->utcp, data, len);
+}
+
+static void channel_poll(struct utcp_connection *connection, size_t len) {
+ meshlink_channel_t *channel = connection->priv;
+
+ if(!channel) {
+ abort();
+ }
+
+ node_t *n = channel->node;
+ meshlink_handle_t *mesh = n->mesh;
+
+ while(channel->aio_send) {
+ if(!len) {
+ /* This poll callback signalled an error, abort all outstanding AIO buffers. */
+ if(!aio_abort(mesh, channel, &channel->aio_send)) {
+ return;
+ }
+
+ break;
+ }
+
+ /* We have at least one AIO buffer. Send as much as possible from the buffers. */
+ meshlink_aio_buffer_t *aio = channel->aio_send;
+ size_t todo = aio->len - aio->done;
+ ssize_t sent;
+
+ if(todo > len) {
+ todo = len;
+ }
+
+ if(aio->data) {
+ sent = utcp_send(connection, (char *)aio->data + aio->done, todo);
+ } else {
+ /* Limit the amount we read at once to avoid stack overflows */
+ if(todo > 65536) {
+ todo = 65536;
+ }
+
+ char buf[todo];
+ ssize_t result = read(aio->fd, buf, todo);
+
+ if(result > 0) {
+ todo = result;
+ sent = utcp_send(connection, buf, todo);
+ } else {
+ if(result < 0 && errno == EINTR) {
+ continue;
+ }
+
+ /* Reading from fd failed, cancel just this AIO buffer. */
+ if(result != 0) {
+ logger(mesh, MESHLINK_ERROR, "Reading from AIO fd %d failed: %s", aio->fd, strerror(errno));
+ }
+
+ if(!aio_finish_one(mesh, channel, &channel->aio_send)) {
+ return;
+ }
+
+ continue;
+ }
+ }
+
+ if(sent != (ssize_t)todo) {
+ /* We should never get a partial send at this point */
+ assert(sent <= 0);
+
+ /* Sending failed, abort all outstanding AIO buffers and send a poll callback. */
+ if(!aio_abort(mesh, channel, &channel->aio_send)) {
+ return;
+ }
+
+ len = 0;
+ break;
+ }
+
+ aio->done += sent;
+ len -= sent;
+
+ /* If we didn't finish this buffer, exit early. */
+ if(aio->done < aio->len) {
+ return;
+ }
+
+ /* Signal completion of this buffer, and go to the next one. */
+ if(!aio_finish_one(mesh, channel, &channel->aio_send)) {
+ return;
+ }
+
+ if(!len) {
+ return;
+ }
+ }
+
+ if(channel->poll_cb) {
+ channel->poll_cb(mesh, channel, len);
+ } else {
+ utcp_set_poll_cb(connection, NULL);
+ }
+}
+
+void meshlink_set_channel_poll_cb(meshlink_handle_t *mesh, meshlink_channel_t *channel, meshlink_channel_poll_cb_t cb) {
+ if(!mesh || !channel) {
+ meshlink_errno = MESHLINK_EINVAL;
+ return;
+ }
+
+ pthread_mutex_lock(&mesh->mutex);
+ channel->poll_cb = cb;
+ utcp_set_poll_cb(channel->c, (cb || channel->aio_send) ? channel_poll : NULL);
+ pthread_mutex_unlock(&mesh->mutex);
+}
+
+void meshlink_set_channel_accept_cb(meshlink_handle_t *mesh, meshlink_channel_accept_cb_t cb) {
+ if(!mesh) {
+ meshlink_errno = MESHLINK_EINVAL;
+ return;
+ }
+
+ pthread_mutex_lock(&mesh->mutex);
+ mesh->channel_accept_cb = cb;
+ mesh->receive_cb = channel_receive;
+
+ for splay_each(node_t, n, mesh->nodes) {
+ if(!n->utcp && n != mesh->self) {
+ n->utcp = utcp_init(channel_accept, channel_pre_accept, channel_send, n);
+ utcp_set_mtu(n->utcp, n->mtu - sizeof(meshlink_packethdr_t));
+ utcp_set_retransmit_cb(n->utcp, channel_retransmit);
+ }
+ }
+
+ pthread_mutex_unlock(&mesh->mutex);
+}
+
+void meshlink_set_channel_sndbuf(meshlink_handle_t *mesh, meshlink_channel_t *channel, size_t size) {
+ (void)mesh;
+
+ if(!channel) {
+ meshlink_errno = MESHLINK_EINVAL;
+ return;
+ }
+
+ pthread_mutex_lock(&mesh->mutex);
+ utcp_set_sndbuf(channel->c, size);
+ pthread_mutex_unlock(&mesh->mutex);
+}
+
+void meshlink_set_channel_rcvbuf(meshlink_handle_t *mesh, meshlink_channel_t *channel, size_t size) {
+ (void)mesh;
+
+ if(!channel) {
+ meshlink_errno = MESHLINK_EINVAL;
+ return;
+ }
+
+ pthread_mutex_lock(&mesh->mutex);
+ utcp_set_rcvbuf(channel->c, size);
+ pthread_mutex_unlock(&mesh->mutex);
+}
+
+meshlink_channel_t *meshlink_channel_open_ex(meshlink_handle_t *mesh, meshlink_node_t *node, uint16_t port, meshlink_channel_receive_cb_t cb, const void *data, size_t len, uint32_t flags) {
+ if(data && len) {
+ abort(); // TODO: handle non-NULL data
+ }
+
+ if(!mesh || !node) {
+ meshlink_errno = MESHLINK_EINVAL;
+ return NULL;
+ }
+
+ pthread_mutex_lock(&mesh->mutex);
+
+ node_t *n = (node_t *)node;
+
+ if(!n->utcp) {
+ n->utcp = utcp_init(channel_accept, channel_pre_accept, channel_send, n);
+ utcp_set_mtu(n->utcp, n->mtu - sizeof(meshlink_packethdr_t));
+ utcp_set_retransmit_cb(n->utcp, channel_retransmit);
+ mesh->receive_cb = channel_receive;
+
+ if(!n->utcp) {
+ meshlink_errno = errno == ENOMEM ? MESHLINK_ENOMEM : MESHLINK_EINTERNAL;
+ pthread_mutex_unlock(&mesh->mutex);
+ return NULL;
+ }
+ }
+
+ if(n->status.blacklisted) {
+ logger(mesh, MESHLINK_ERROR, "Cannot open a channel with blacklisted node\n");
+ meshlink_errno = MESHLINK_EBLACKLISTED;
+ pthread_mutex_unlock(&mesh->mutex);
+ return NULL;
+ }
+
+ meshlink_channel_t *channel = xzalloc(sizeof(*channel));
+ channel->node = n;
+ channel->receive_cb = cb;
+
+ if(data && !len) {
+ channel->priv = (void *)data;
+ }
+
+ channel->c = utcp_connect_ex(n->utcp, port, channel_recv, channel, flags);
+
+ pthread_mutex_unlock(&mesh->mutex);
+
+ if(!channel->c) {
+ meshlink_errno = errno == ENOMEM ? MESHLINK_ENOMEM : MESHLINK_EINTERNAL;
+ free(channel);
+ return NULL;
+ }
+
+ return channel;
+}
+
+meshlink_channel_t *meshlink_channel_open(meshlink_handle_t *mesh, meshlink_node_t *node, uint16_t port, meshlink_channel_receive_cb_t cb, const void *data, size_t len) {
+ return meshlink_channel_open_ex(mesh, node, port, cb, data, len, MESHLINK_CHANNEL_TCP);
+}
+
+void meshlink_channel_shutdown(meshlink_handle_t *mesh, meshlink_channel_t *channel, int direction) {
+ if(!mesh || !channel) {
+ meshlink_errno = MESHLINK_EINVAL;
+ return;
+ }
+
+ pthread_mutex_lock(&mesh->mutex);
+ utcp_shutdown(channel->c, direction);
+ pthread_mutex_unlock(&mesh->mutex);
+}
+
+void meshlink_channel_close(meshlink_handle_t *mesh, meshlink_channel_t *channel) {
+ if(!mesh || !channel) {
+ meshlink_errno = MESHLINK_EINVAL;
+ return;
+ }
+
+ pthread_mutex_lock(&mesh->mutex);
+
+ if(channel->c) {
+ utcp_close(channel->c);
+ channel->c = NULL;
+
+ /* Clean up any outstanding AIO buffers. */
+ aio_abort(mesh, channel, &channel->aio_send);
+ aio_abort(mesh, channel, &channel->aio_receive);
+ }
+
+ if(!channel->in_callback) {
+ free(channel);
+ }
+
+ pthread_mutex_unlock(&mesh->mutex);
+}
+
+ssize_t meshlink_channel_send(meshlink_handle_t *mesh, meshlink_channel_t *channel, const void *data, size_t len) {
+ if(!mesh || !channel) {
+ meshlink_errno = MESHLINK_EINVAL;
+ return -1;
+ }
+
+ if(!len) {
+ return 0;
+ }
+
+ if(!data) {
+ meshlink_errno = MESHLINK_EINVAL;
+ return -1;
+ }
+
+ // TODO: more finegrained locking.
+ // Ideally we want to put the data into the UTCP connection's send buffer.
+ // Then, preferably only if there is room in the receiver window,
+ // kick the meshlink thread to go send packets.
+
+ ssize_t retval;
+
+ pthread_mutex_lock(&mesh->mutex);
+
+ /* Disallow direct calls to utcp_send() while we still have AIO active. */
+ if(channel->aio_send) {
+ retval = 0;
+ } else {
+ retval = utcp_send(channel->c, data, len);
+ }
+
+ pthread_mutex_unlock(&mesh->mutex);
+
+ if(retval < 0) {
+ meshlink_errno = MESHLINK_ENETWORK;
+ }
+
+ return retval;
+}
+
+bool meshlink_channel_aio_send(meshlink_handle_t *mesh, meshlink_channel_t *channel, const void *data, size_t len, meshlink_aio_cb_t cb, void *priv) {
+ if(!mesh || !channel) {
+ meshlink_errno = MESHLINK_EINVAL;
+ return false;
+ }