-static bool channel_pre_accept(struct utcp *utcp, uint16_t port) {
- (void)port;
- node_t *n = utcp->priv;
- meshlink_handle_t *mesh = n->mesh;
-
- if(mesh->channel_accept_cb && mesh->channel_listen_cb) {
- return mesh->channel_listen_cb(mesh, (meshlink_node_t *)n, port);
- } else {
- return mesh->channel_accept_cb;
- }
-}
-
-static ssize_t channel_recv(struct utcp_connection *connection, const void *data, size_t len) {
- meshlink_channel_t *channel = connection->priv;
-
- if(!channel) {
- abort();
- }
-
- node_t *n = channel->node;
- meshlink_handle_t *mesh = n->mesh;
-
- if(n->status.destroyed) {
- meshlink_channel_close(mesh, channel);
- return len;
- }
-
- const char *p = data;
- size_t left = len;
-
- if(channel->receive_cb) {
- channel->receive_cb(mesh, channel, p, left);
- }
-
- return len;
-}
-
-static void channel_accept(struct utcp_connection *utcp_connection, uint16_t port) {
- node_t *n = utcp_connection->utcp->priv;
-
- if(!n) {
- abort();
- }
-
- meshlink_handle_t *mesh = n->mesh;
-
- if(!mesh->channel_accept_cb) {
- return;
- }
-
- meshlink_channel_t *channel = xzalloc(sizeof(*channel));
- channel->node = n;
- channel->c = utcp_connection;
-
- if(mesh->channel_accept_cb(mesh, channel, port, NULL, 0)) {
- utcp_accept(utcp_connection, channel_recv, channel);
- } else {
- free(channel);
- }
-}
-
-static ssize_t channel_send(struct utcp *utcp, const void *data, size_t len) {
- node_t *n = utcp->priv;
-
- if(n->status.destroyed) {
- return -1;
- }
-
- meshlink_handle_t *mesh = n->mesh;
- return meshlink_send_immediate(mesh, (meshlink_node_t *)n, data, len) ? (ssize_t)len : -1;
-}
-
-void meshlink_set_channel_receive_cb(meshlink_handle_t *mesh, meshlink_channel_t *channel, meshlink_channel_receive_cb_t cb) {
- logger(mesh, MESHLINK_DEBUG, "meshlink_set_channel_receive_cb(%p, %p)", (void *)channel, (void *)(intptr_t)cb);
-
- if(!mesh || !channel) {
- meshlink_errno = MESHLINK_EINVAL;
- return;
- }
-
- channel->receive_cb = cb;
-}
-
-static void channel_receive(meshlink_handle_t *mesh, meshlink_node_t *source, const void *data, size_t len) {
- (void)mesh;
- node_t *n = (node_t *)source;
-
- if(!n->utcp) {
- abort();
- }
-
- utcp_recv(n->utcp, data, len);
-}
-
-void meshlink_set_channel_listen_cb(meshlink_handle_t *mesh, meshlink_channel_listen_cb_t cb) {
- logger(mesh, MESHLINK_DEBUG, "meshlink_set_channel_listen_cb(%p)", (void *)(intptr_t)cb);
-
- if(!mesh) {
- meshlink_errno = MESHLINK_EINVAL;
- return;
- }
-
- if(pthread_mutex_lock(&mesh->mutex) != 0) {
- abort();
- }
-
- mesh->channel_listen_cb = cb;
-
- pthread_mutex_unlock(&mesh->mutex);
-}
-
-void meshlink_set_channel_accept_cb(meshlink_handle_t *mesh, meshlink_channel_accept_cb_t cb) {
- logger(mesh, MESHLINK_DEBUG, "meshlink_set_channel_accept_cb(%p)", (void *)(intptr_t)cb);
-
- if(!mesh) {
- meshlink_errno = MESHLINK_EINVAL;
- return;
- }
-
- if(pthread_mutex_lock(&mesh->mutex) != 0) {
- abort();
- }
-
- mesh->channel_accept_cb = cb;
- mesh->receive_cb = channel_receive;
-
- if(mesh->peer) {
- mesh->peer->utcp = utcp_init(channel_accept, channel_pre_accept, channel_send, mesh->peer);
- }
-
- pthread_mutex_unlock(&mesh->mutex);
-}
-
-void meshlink_set_channel_flags(meshlink_handle_t *mesh, meshlink_channel_t *channel, uint32_t flags) {
- logger(mesh, MESHLINK_DEBUG, "meshlink_set_channel_flags(%p, %u)", (void *)channel, flags);
-
- if(!mesh || !channel) {
- meshlink_errno = MESHLINK_EINVAL;
- return;
- }
-
- if(pthread_mutex_lock(&mesh->mutex) != 0) {
- abort();
- }
-
- utcp_set_flags(channel->c, flags);
- pthread_mutex_unlock(&mesh->mutex);
-}
-
-meshlink_channel_t *meshlink_channel_open_ex(meshlink_handle_t *mesh, meshlink_node_t *node, uint16_t port, meshlink_channel_receive_cb_t cb, const void *data, size_t len, uint32_t flags) {
- logger(mesh, MESHLINK_DEBUG, "meshlink_channel_open_ex(%s, %u, %p, %p, %zu, %u)", node ? node->name : "(null)", port, (void *)(intptr_t)cb, data, len, flags);
-
- if(data && len) {
- abort(); // TODO: handle non-NULL data
- }
-
- if(!mesh || !node) {
- meshlink_errno = MESHLINK_EINVAL;
- return NULL;
- }
-
- if(pthread_mutex_lock(&mesh->mutex) != 0) {
- abort();
- }
-
- node_t *n = (node_t *)node;
-
- if(!n->utcp) {
- n->utcp = utcp_init(channel_accept, channel_pre_accept, channel_send, n);
- mesh->receive_cb = channel_receive;
-
- if(!n->utcp) {
- meshlink_errno = errno == ENOMEM ? MESHLINK_ENOMEM : MESHLINK_EINTERNAL;
- pthread_mutex_unlock(&mesh->mutex);
- return NULL;
- }
- }
-
- if(n->status.blacklisted) {
- logger(mesh, MESHLINK_ERROR, "Cannot open a channel with blacklisted node\n");
- meshlink_errno = MESHLINK_EBLACKLISTED;
- pthread_mutex_unlock(&mesh->mutex);
- return NULL;
- }
-
- meshlink_channel_t *channel = xzalloc(sizeof(*channel));
- channel->node = n;
- channel->receive_cb = cb;
-
- if(data && !len) {
- channel->priv = (void *)data;
- }
-
- channel->c = utcp_connect_ex(n->utcp, port, channel_recv, channel, flags);
-
- pthread_mutex_unlock(&mesh->mutex);
-
- if(!channel->c) {
- meshlink_errno = errno == ENOMEM ? MESHLINK_ENOMEM : MESHLINK_EINTERNAL;
- free(channel);
- return NULL;
- }
-
- return channel;
-}
-
-meshlink_channel_t *meshlink_channel_open(meshlink_handle_t *mesh, meshlink_node_t *node, uint16_t port, meshlink_channel_receive_cb_t cb, const void *data, size_t len) {
- logger(mesh, MESHLINK_DEBUG, "meshlink_channel_open_ex(%s, %u, %p, %p, %zu)", node ? node->name : "(null)", port, (void *)(intptr_t)cb, data, len);
-
- return meshlink_channel_open_ex(mesh, node, port, cb, data, len, MESHLINK_CHANNEL_TCP);
-}
-
-void meshlink_channel_shutdown(meshlink_handle_t *mesh, meshlink_channel_t *channel, int direction) {
- logger(mesh, MESHLINK_DEBUG, "meshlink_channel_shutdown(%p, %d)", (void *)channel, direction);
-
- if(!mesh || !channel) {
- meshlink_errno = MESHLINK_EINVAL;
- return;
- }
-
- if(pthread_mutex_lock(&mesh->mutex) != 0) {
- abort();
- }
-
- utcp_shutdown(channel->c, direction);
- pthread_mutex_unlock(&mesh->mutex);
-}
-
-void meshlink_channel_close(meshlink_handle_t *mesh, meshlink_channel_t *channel) {
- logger(mesh, MESHLINK_DEBUG, "meshlink_channel_close(%p)", (void *)channel);
-
- if(!mesh || !channel) {
- meshlink_errno = MESHLINK_EINVAL;
- return;
- }
-
- if(pthread_mutex_lock(&mesh->mutex) != 0) {
- abort();
- }
-
- if(channel->c) {
- utcp_close(channel->c);
- channel->c = NULL;
- }
-
- if(!channel->in_callback) {
- free(channel);
- }
-
- pthread_mutex_unlock(&mesh->mutex);
-}
-
-void meshlink_channel_abort(meshlink_handle_t *mesh, meshlink_channel_t *channel) {
- logger(mesh, MESHLINK_DEBUG, "meshlink_channel_abort(%p)", (void *)channel);
-
- if(!mesh || !channel) {
- meshlink_errno = MESHLINK_EINVAL;
- return;
- }
-
- if(pthread_mutex_lock(&mesh->mutex) != 0) {
- abort();
- }
-
- if(channel->c) {
- utcp_abort(channel->c);
- channel->c = NULL;
- }
-
- if(!channel->in_callback) {
- free(channel);
- }
-
- pthread_mutex_unlock(&mesh->mutex);
-}
-
-ssize_t meshlink_channel_send(meshlink_handle_t *mesh, meshlink_channel_t *channel, const void *data, size_t len) {
- logger(mesh, MESHLINK_DEBUG, "meshlink_channel_send(%p, %p, %zu)", (void *)channel, data, len);
-
- if(!mesh || !channel) {
- meshlink_errno = MESHLINK_EINVAL;
- return -1;
- }
-
- if(!len) {
- return 0;
- }
-
- if(!data) {
- meshlink_errno = MESHLINK_EINVAL;
- return -1;
- }
-
- // TODO: more finegrained locking.
- // Ideally we want to put the data into the UTCP connection's send buffer.
- // Then, preferably only if there is room in the receiver window,
- // kick the meshlink thread to go send packets.
-
- ssize_t retval;
-
- if(pthread_mutex_lock(&mesh->mutex) != 0) {
- abort();
- }
-
- retval = utcp_send(channel->c, data, len);
-
- pthread_mutex_unlock(&mesh->mutex);
-
- if(retval < 0) {
- meshlink_errno = MESHLINK_ENETWORK;
- }
-
- return retval;
-}
-
-uint32_t meshlink_channel_get_flags(meshlink_handle_t *mesh, meshlink_channel_t *channel) {
- if(!mesh || !channel) {
- meshlink_errno = MESHLINK_EINVAL;
- return -1;
- }
-
- return channel->c->flags;
-}
-
-size_t meshlink_channel_get_mss(meshlink_handle_t *mesh, meshlink_channel_t *channel) {
- if(!mesh || !channel) {
- meshlink_errno = MESHLINK_EINVAL;
- return -1;
- }
-
- return utcp_get_mss(channel->node->utcp);
-}
-
-void meshlink_set_node_channel_timeout(meshlink_handle_t *mesh, meshlink_node_t *node, int timeout) {
- logger(mesh, MESHLINK_DEBUG, "meshlink_set_node_channel_timeout(%s, %d)", node ? node->name : "(null)", timeout);
-
- if(!mesh || !node) {
- meshlink_errno = MESHLINK_EINVAL;
- return;
- }
-
- node_t *n = (node_t *)node;
-
- if(pthread_mutex_lock(&mesh->mutex) != 0) {
- abort();
- }
-
- if(!n->utcp) {
- n->utcp = utcp_init(channel_accept, channel_pre_accept, channel_send, n);
- }
-
- utcp_set_user_timeout(n->utcp, timeout);
-
- pthread_mutex_unlock(&mesh->mutex);
-}
-