X-Git-Url: http://git.meshlink.io/?a=blobdiff_plain;f=src%2Fmeshlink.c;h=b69c1619e498beec8a288c3960e4abb9983ddcac;hb=f3014d25c8b6f8cc1cf8ab48cabf6fbc8e8311d0;hp=19d9fdca9d32f31c9acd92ea330ce51a3686968f;hpb=664466227602b6aabd4f601cf57183bbfa45bc08;p=meshlink-tiny diff --git a/src/meshlink.c b/src/meshlink.c index 19d9fdc..b69c161 100644 --- a/src/meshlink.c +++ b/src/meshlink.c @@ -30,7 +30,6 @@ #include "packmsg.h" #include "prf.h" #include "protocol.h" -#include "route.h" #include "sockaddr.h" #include "utils.h" #include "xalloc.h" @@ -490,15 +489,11 @@ static bool ecdsa_keygen(meshlink_handle_t *mesh) { static struct timespec idle(event_loop_t *loop, void *data) { (void)loop; - meshlink_handle_t *mesh = data; + (void)data; - if(mesh->peer && mesh->peer->utcp) { - return utcp_timeout(mesh->peer->utcp); - } else { - return (struct timespec) { - 3600, 0 - }; - } + return (struct timespec) { + 3600, 0 + }; } static bool meshlink_setup(meshlink_handle_t *mesh) { @@ -1419,22 +1414,6 @@ void meshlink_set_node_status_cb(meshlink_handle_t *mesh, meshlink_node_status_c pthread_mutex_unlock(&mesh->mutex); } -void meshlink_set_node_pmtu_cb(meshlink_handle_t *mesh, meshlink_node_pmtu_cb_t cb) { - logger(mesh, MESHLINK_DEBUG, "meshlink_set_node_pmtu_cb(%p)", (void *)(intptr_t)cb); - - if(!mesh) { - meshlink_errno = MESHLINK_EINVAL; - return; - } - - if(pthread_mutex_lock(&mesh->mutex) != 0) { - abort(); - } - - mesh->node_pmtu_cb = cb; - pthread_mutex_unlock(&mesh->mutex); -} - void meshlink_set_node_duplicate_cb(meshlink_handle_t *mesh, meshlink_node_duplicate_cb_t cb) { logger(mesh, MESHLINK_DEBUG, "meshlink_set_node_duplicate_cb(%p)", (void *)(intptr_t)cb); @@ -1484,56 +1463,6 @@ void meshlink_set_error_cb(struct meshlink_handle *mesh, meshlink_error_cb_t cb) pthread_mutex_unlock(&mesh->mutex); } -static bool prepare_packet(meshlink_handle_t *mesh, meshlink_node_t *destination, const void *data, size_t len, vpn_packet_t *packet) { - meshlink_packethdr_t *hdr; - - if(len > MAXSIZE - sizeof(*hdr)) { - meshlink_errno = MESHLINK_EINVAL; - return false; - } - - node_t *n = (node_t *)destination; - - if(n->status.blacklisted) { - logger(mesh, MESHLINK_ERROR, "Node %s blacklisted, dropping packet\n", n->name); - meshlink_errno = MESHLINK_EBLACKLISTED; - return false; - } - - // Prepare the packet - packet->probe = false; - packet->tcp = false; - packet->len = len + sizeof(*hdr); - - hdr = (meshlink_packethdr_t *)packet->data; - memset(hdr, 0, sizeof(*hdr)); - // leave the last byte as 0 to make sure strings are always - // null-terminated if they are longer than the buffer - strncpy((char *)hdr->destination, destination->name, sizeof(hdr->destination) - 1); - strncpy((char *)hdr->source, mesh->self->name, sizeof(hdr->source) - 1); - - memcpy(packet->data + sizeof(*hdr), data, len); - - return true; -} - -static bool meshlink_send_immediate(meshlink_handle_t *mesh, meshlink_node_t *destination, const void *data, size_t len) { - assert(mesh); - assert(destination); - assert(data); - assert(len); - - // Prepare the packet - if(!prepare_packet(mesh, destination, data, len, mesh->packet)) { - return false; - } - - // Send it immediately - route(mesh, mesh->self, mesh->packet); - - return true; -} - bool meshlink_send(meshlink_handle_t *mesh, meshlink_node_t *destination, const void *data, size_t len) { logger(mesh, MESHLINK_DEBUG, "meshlink_send(%s, %p, %zu)", destination ? destination->name : "(null)", data, len); @@ -1547,7 +1476,7 @@ bool meshlink_send(meshlink_handle_t *mesh, meshlink_node_t *destination, const return true; } - if(!data) { + if(!data || len > MTU) { meshlink_errno = MESHLINK_EINVAL; return false; } @@ -1560,10 +1489,8 @@ bool meshlink_send(meshlink_handle_t *mesh, meshlink_node_t *destination, const return false; } - if(!prepare_packet(mesh, destination, data, len, packet)) { - free(packet); - return false; - } + packet->len = len; + memcpy(packet->data, data, len); // Queue it if(!meshlink_queue_push(&mesh->outpacketqueue, packet)) { @@ -1588,38 +1515,11 @@ void meshlink_send_from_queue(event_loop_t *loop, void *data) { for(vpn_packet_t *packet; (packet = meshlink_queue_pop(&mesh->outpacketqueue));) { logger(mesh, MESHLINK_DEBUG, "Removing packet of %d bytes from packet queue", packet->len); - mesh->self->in_packets++; - mesh->self->in_bytes += packet->len; - route(mesh, mesh->self, packet); + send_raw_packet(mesh, mesh->peer->connection, packet); free(packet); } } -ssize_t meshlink_get_pmtu(meshlink_handle_t *mesh, meshlink_node_t *destination) { - if(!mesh || !destination) { - meshlink_errno = MESHLINK_EINVAL; - return -1; - } - - if(pthread_mutex_lock(&mesh->mutex) != 0) { - abort(); - } - - node_t *n = (node_t *)destination; - - if(!n->status.reachable) { - pthread_mutex_unlock(&mesh->mutex); - return 0; - - } else if(n->mtuprobes > 30 && n->minmtu) { - pthread_mutex_unlock(&mesh->mutex); - return n->minmtu; - } else { - pthread_mutex_unlock(&mesh->mutex); - return MTU; - } -} - char *meshlink_get_fingerprint(meshlink_handle_t *mesh, meshlink_node_t *node) { if(!mesh || !node) { meshlink_errno = MESHLINK_EINVAL; @@ -2289,309 +2189,68 @@ void meshlink_hint_address(meshlink_handle_t *mesh, meshlink_node_t *node, const // @TODO do we want to fire off a connection attempt right away? } -static bool channel_pre_accept(struct utcp *utcp, uint16_t port) { - (void)port; - node_t *n = utcp->priv; - meshlink_handle_t *mesh = n->mesh; - - if(mesh->channel_accept_cb && mesh->channel_listen_cb) { - return mesh->channel_listen_cb(mesh, (meshlink_node_t *)n, port); - } else { - return mesh->channel_accept_cb; - } -} - -/* Finish one AIO buffer, return true if the channel is still open. */ -static bool aio_finish_one(meshlink_handle_t *mesh, meshlink_channel_t *channel, meshlink_aio_buffer_t **head) { - meshlink_aio_buffer_t *aio = *head; - *head = aio->next; - - if(channel->c) { - channel->in_callback = true; - - if(aio->data) { - if(aio->cb.buffer) { - aio->cb.buffer(mesh, channel, aio->data, aio->done, aio->priv); - } - } else { - if(aio->cb.fd) { - aio->cb.fd(mesh, channel, aio->fd, aio->done, aio->priv); - } - } - - channel->in_callback = false; - - if(!channel->c) { - free(aio); - free(channel); - return false; - } - } - - free(aio); - return true; -} - -/* Finish all AIO buffers, return true if the channel is still open. */ -static bool aio_abort(meshlink_handle_t *mesh, meshlink_channel_t *channel, meshlink_aio_buffer_t **head) { - while(*head) { - if(!aio_finish_one(mesh, channel, head)) { - return false; - } +void update_node_status(meshlink_handle_t *mesh, node_t *n) { + if(mesh->node_status_cb) { + mesh->node_status_cb(mesh, (meshlink_node_t *)n, n->status.reachable && !n->status.blacklisted); } - - return true; } -static ssize_t channel_recv(struct utcp_connection *connection, const void *data, size_t len) { - meshlink_channel_t *channel = connection->priv; - - if(!channel) { - abort(); - } - - node_t *n = channel->node; - meshlink_handle_t *mesh = n->mesh; - - if(n->status.destroyed) { - meshlink_channel_close(mesh, channel); - return len; - } - - const char *p = data; - size_t left = len; - - while(channel->aio_receive) { - if(!len) { - /* This receive callback signalled an error, abort all outstanding AIO buffers. */ - if(!aio_abort(mesh, channel, &channel->aio_receive)) { - return len; - } - - break; - } - - meshlink_aio_buffer_t *aio = channel->aio_receive; - size_t todo = aio->len - aio->done; - - if(todo > left) { - todo = left; - } - - if(aio->data) { - memcpy((char *)aio->data + aio->done, p, todo); - } else { - ssize_t result = write(aio->fd, p, todo); - - if(result <= 0) { - if(result < 0 && errno == EINTR) { - continue; - } - - /* Writing to fd failed, cancel just this AIO buffer. */ - logger(mesh, MESHLINK_ERROR, "Writing to AIO fd %d failed: %s", aio->fd, strerror(errno)); - - if(!aio_finish_one(mesh, channel, &channel->aio_receive)) { - return len; - } - - continue; - } - - todo = result; - } - - aio->done += todo; - p += todo; - left -= todo; - - if(aio->done == aio->len) { - if(!aio_finish_one(mesh, channel, &channel->aio_receive)) { - return len; - } - } - - if(!left) { - return len; - } - } - - if(channel->receive_cb) { - channel->receive_cb(mesh, channel, p, left); +void handle_duplicate_node(meshlink_handle_t *mesh, node_t *n) { + if(!mesh->node_duplicate_cb || n->status.duplicate) { + return; } - return len; + n->status.duplicate = true; + mesh->node_duplicate_cb(mesh, (meshlink_node_t *)n); } -static void channel_accept(struct utcp_connection *utcp_connection, uint16_t port) { - node_t *n = utcp_connection->utcp->priv; - - if(!n) { - abort(); - } - - meshlink_handle_t *mesh = n->mesh; +void meshlink_hint_network_change(struct meshlink_handle *mesh) { + logger(mesh, MESHLINK_DEBUG, "meshlink_hint_network_change()"); - if(!mesh->channel_accept_cb) { + if(!mesh) { + meshlink_errno = MESHLINK_EINVAL; return; } - meshlink_channel_t *channel = xzalloc(sizeof(*channel)); - channel->node = n; - channel->c = utcp_connection; - - if(mesh->channel_accept_cb(mesh, channel, port, NULL, 0)) { - utcp_accept(utcp_connection, channel_recv, channel); - } else { - free(channel); + if(pthread_mutex_lock(&mesh->mutex) != 0) { + abort(); } -} - -static void channel_retransmit(struct utcp_connection *utcp_connection) { - node_t *n = utcp_connection->utcp->priv; - meshlink_handle_t *mesh = n->mesh; - if(n->mtuprobes == 31 && n->mtutimeout.cb) { - timeout_set(&mesh->loop, &n->mtutimeout, &(struct timespec) { - 0, 0 - }); - } + pthread_mutex_unlock(&mesh->mutex); } -static ssize_t channel_send(struct utcp *utcp, const void *data, size_t len) { - node_t *n = utcp->priv; +void meshlink_set_dev_class_timeouts(meshlink_handle_t *mesh, dev_class_t devclass, int pinginterval, int pingtimeout) { + logger(mesh, MESHLINK_DEBUG, "meshlink_set_dev_class_timeouts(%d, %d, %d)", devclass, pinginterval, pingtimeout); - if(n->status.destroyed) { - return -1; + if(!mesh || devclass < 0 || devclass >= DEV_CLASS_COUNT) { + meshlink_errno = EINVAL; + return; } - meshlink_handle_t *mesh = n->mesh; - return meshlink_send_immediate(mesh, (meshlink_node_t *)n, data, len) ? (ssize_t)len : -1; -} - -void meshlink_set_channel_receive_cb(meshlink_handle_t *mesh, meshlink_channel_t *channel, meshlink_channel_receive_cb_t cb) { - logger(mesh, MESHLINK_DEBUG, "meshlink_set_channel_receive_cb(%p, %p)", (void *)channel, (void *)(intptr_t)cb); - - if(!mesh || !channel) { - meshlink_errno = MESHLINK_EINVAL; + if(pinginterval < 1 || pingtimeout < 1 || pingtimeout > pinginterval) { + meshlink_errno = EINVAL; return; } - channel->receive_cb = cb; -} - -static void channel_receive(meshlink_handle_t *mesh, meshlink_node_t *source, const void *data, size_t len) { - (void)mesh; - node_t *n = (node_t *)source; - - if(!n->utcp) { + if(pthread_mutex_lock(&mesh->mutex) != 0) { abort(); } - utcp_recv(n->utcp, data, len); + mesh->dev_class_traits[devclass].pinginterval = pinginterval; + mesh->dev_class_traits[devclass].pingtimeout = pingtimeout; + pthread_mutex_unlock(&mesh->mutex); } -static void channel_poll(struct utcp_connection *connection, size_t len) { - meshlink_channel_t *channel = connection->priv; - - if(!channel) { - abort(); - } - - node_t *n = channel->node; - meshlink_handle_t *mesh = n->mesh; - - while(channel->aio_send) { - if(!len) { - /* This poll callback signalled an error, abort all outstanding AIO buffers. */ - if(!aio_abort(mesh, channel, &channel->aio_send)) { - return; - } - - break; - } - - /* We have at least one AIO buffer. Send as much as possible from the buffers. */ - meshlink_aio_buffer_t *aio = channel->aio_send; - size_t todo = aio->len - aio->done; - ssize_t sent; - - if(todo > len) { - todo = len; - } - - if(aio->data) { - sent = utcp_send(connection, (char *)aio->data + aio->done, todo); - } else { - /* Limit the amount we read at once to avoid stack overflows */ - if(todo > 65536) { - todo = 65536; - } - - char buf[todo]; - ssize_t result = read(aio->fd, buf, todo); - - if(result > 0) { - todo = result; - sent = utcp_send(connection, buf, todo); - } else { - if(result < 0 && errno == EINTR) { - continue; - } - - /* Reading from fd failed, cancel just this AIO buffer. */ - if(result != 0) { - logger(mesh, MESHLINK_ERROR, "Reading from AIO fd %d failed: %s", aio->fd, strerror(errno)); - } - - if(!aio_finish_one(mesh, channel, &channel->aio_send)) { - return; - } - - continue; - } - } - - if(sent != (ssize_t)todo) { - /* Sending failed, abort all outstanding AIO buffers and send a poll callback. */ - if(!aio_abort(mesh, channel, &channel->aio_send)) { - return; - } - - len = 0; - break; - } - - aio->done += sent; - len -= sent; - - /* If we didn't finish this buffer, exit early. */ - if(aio->done < aio->len) { - return; - } - - /* Signal completion of this buffer, and go to the next one. */ - if(!aio_finish_one(mesh, channel, &channel->aio_send)) { - return; - } - - if(!len) { - return; - } - } +void meshlink_set_dev_class_fast_retry_period(meshlink_handle_t *mesh, dev_class_t devclass, int fast_retry_period) { + logger(mesh, MESHLINK_DEBUG, "meshlink_set_dev_class_fast_retry_period(%d, %d)", devclass, fast_retry_period); - if(channel->poll_cb) { - channel->poll_cb(mesh, channel, len); - } else { - utcp_set_poll_cb(connection, NULL); + if(!mesh || devclass < 0 || devclass >= DEV_CLASS_COUNT) { + meshlink_errno = EINVAL; + return; } -} - -void meshlink_set_channel_poll_cb(meshlink_handle_t *mesh, meshlink_channel_t *channel, meshlink_channel_poll_cb_t cb) { - logger(mesh, MESHLINK_DEBUG, "meshlink_set_channel_poll_cb(%p, %p)", (void *)channel, (void *)(intptr_t)cb); - if(!mesh || !channel) { - meshlink_errno = MESHLINK_EINVAL; + if(fast_retry_period < 0) { + meshlink_errno = EINVAL; return; } @@ -2599,16 +2258,20 @@ void meshlink_set_channel_poll_cb(meshlink_handle_t *mesh, meshlink_channel_t *c abort(); } - channel->poll_cb = cb; - utcp_set_poll_cb(channel->c, (cb || channel->aio_send) ? channel_poll : NULL); + mesh->dev_class_traits[devclass].fast_retry_period = fast_retry_period; pthread_mutex_unlock(&mesh->mutex); } -void meshlink_set_channel_listen_cb(meshlink_handle_t *mesh, meshlink_channel_listen_cb_t cb) { - logger(mesh, MESHLINK_DEBUG, "meshlink_set_channel_listen_cb(%p)", (void *)(intptr_t)cb); +void meshlink_set_dev_class_maxtimeout(struct meshlink_handle *mesh, dev_class_t devclass, int maxtimeout) { + logger(mesh, MESHLINK_DEBUG, "meshlink_set_dev_class_fast_maxtimeout(%d, %d)", devclass, maxtimeout); - if(!mesh) { - meshlink_errno = MESHLINK_EINVAL; + if(!mesh || devclass < 0 || devclass >= DEV_CLASS_COUNT) { + meshlink_errno = EINVAL; + return; + } + + if(maxtimeout < 0) { + meshlink_errno = EINVAL; return; } @@ -2616,16 +2279,14 @@ void meshlink_set_channel_listen_cb(meshlink_handle_t *mesh, meshlink_channel_li abort(); } - mesh->channel_listen_cb = cb; - + mesh->dev_class_traits[devclass].maxtimeout = maxtimeout; pthread_mutex_unlock(&mesh->mutex); } -void meshlink_set_channel_accept_cb(meshlink_handle_t *mesh, meshlink_channel_accept_cb_t cb) { - logger(mesh, MESHLINK_DEBUG, "meshlink_set_channel_accept_cb(%p)", (void *)(intptr_t)cb); +void meshlink_reset_timers(struct meshlink_handle *mesh) { + logger(mesh, MESHLINK_DEBUG, "meshlink_reset_timers()"); if(!mesh) { - meshlink_errno = MESHLINK_EINVAL; return; } @@ -2633,35 +2294,16 @@ void meshlink_set_channel_accept_cb(meshlink_handle_t *mesh, meshlink_channel_ac abort(); } - mesh->channel_accept_cb = cb; - mesh->receive_cb = channel_receive; - - if(mesh->peer) { - mesh->peer->utcp = utcp_init(channel_accept, channel_pre_accept, channel_send, mesh->peer); - utcp_set_mtu(mesh->peer->utcp, mesh->peer->mtu - sizeof(meshlink_packethdr_t)); - utcp_set_retransmit_cb(mesh->peer->utcp, channel_retransmit); - } + handle_network_change(mesh, true); pthread_mutex_unlock(&mesh->mutex); } -void meshlink_set_channel_sndbuf(meshlink_handle_t *mesh, meshlink_channel_t *channel, size_t size) { - logger(mesh, MESHLINK_DEBUG, "meshlink_set_channel_sndbuf(%p, %zu)", (void *)channel, size); - - meshlink_set_channel_sndbuf_storage(mesh, channel, NULL, size); -} - -void meshlink_set_channel_rcvbuf(meshlink_handle_t *mesh, meshlink_channel_t *channel, size_t size) { - logger(mesh, MESHLINK_DEBUG, "meshlink_set_channel_rcvbuf(%p, %zu)", (void *)channel, size); - - meshlink_set_channel_rcvbuf_storage(mesh, channel, NULL, size); -} - -void meshlink_set_channel_sndbuf_storage(meshlink_handle_t *mesh, meshlink_channel_t *channel, void *buf, size_t size) { - logger(mesh, MESHLINK_DEBUG, "meshlink_set_channel_sndbuf_storage(%p, %p, %zu)", (void *)channel, buf, size); +void meshlink_set_inviter_commits_first(struct meshlink_handle *mesh, bool inviter_commits_first) { + logger(mesh, MESHLINK_DEBUG, "meshlink_set_inviter_commits_first(%d)", inviter_commits_first); - if(!mesh || !channel) { - meshlink_errno = MESHLINK_EINVAL; + if(!mesh) { + meshlink_errno = EINVAL; return; } @@ -2669,603 +2311,10 @@ void meshlink_set_channel_sndbuf_storage(meshlink_handle_t *mesh, meshlink_chann abort(); } - utcp_set_sndbuf(channel->c, buf, size); + mesh->inviter_commits_first = inviter_commits_first; pthread_mutex_unlock(&mesh->mutex); } -void meshlink_set_channel_rcvbuf_storage(meshlink_handle_t *mesh, meshlink_channel_t *channel, void *buf, size_t size) { - logger(mesh, MESHLINK_DEBUG, "meshlink_set_channel_rcvbuf_storage(%p, %p, %zu)", (void *)channel, buf, size); - - if(!mesh || !channel) { - meshlink_errno = MESHLINK_EINVAL; - return; - } - - if(pthread_mutex_lock(&mesh->mutex) != 0) { - abort(); - } - - utcp_set_rcvbuf(channel->c, buf, size); - pthread_mutex_unlock(&mesh->mutex); -} - -void meshlink_set_channel_flags(meshlink_handle_t *mesh, meshlink_channel_t *channel, uint32_t flags) { - logger(mesh, MESHLINK_DEBUG, "meshlink_set_channel_flags(%p, %u)", (void *)channel, flags); - - if(!mesh || !channel) { - meshlink_errno = MESHLINK_EINVAL; - return; - } - - if(pthread_mutex_lock(&mesh->mutex) != 0) { - abort(); - } - - utcp_set_flags(channel->c, flags); - pthread_mutex_unlock(&mesh->mutex); -} - -meshlink_channel_t *meshlink_channel_open_ex(meshlink_handle_t *mesh, meshlink_node_t *node, uint16_t port, meshlink_channel_receive_cb_t cb, const void *data, size_t len, uint32_t flags) { - logger(mesh, MESHLINK_DEBUG, "meshlink_channel_open_ex(%s, %u, %p, %p, %zu, %u)", node ? node->name : "(null)", port, (void *)(intptr_t)cb, data, len, flags); - - if(data && len) { - abort(); // TODO: handle non-NULL data - } - - if(!mesh || !node) { - meshlink_errno = MESHLINK_EINVAL; - return NULL; - } - - if(pthread_mutex_lock(&mesh->mutex) != 0) { - abort(); - } - - node_t *n = (node_t *)node; - - if(!n->utcp) { - n->utcp = utcp_init(channel_accept, channel_pre_accept, channel_send, n); - utcp_set_mtu(n->utcp, n->mtu - sizeof(meshlink_packethdr_t)); - utcp_set_retransmit_cb(n->utcp, channel_retransmit); - mesh->receive_cb = channel_receive; - - if(!n->utcp) { - meshlink_errno = errno == ENOMEM ? MESHLINK_ENOMEM : MESHLINK_EINTERNAL; - pthread_mutex_unlock(&mesh->mutex); - return NULL; - } - } - - if(n->status.blacklisted) { - logger(mesh, MESHLINK_ERROR, "Cannot open a channel with blacklisted node\n"); - meshlink_errno = MESHLINK_EBLACKLISTED; - pthread_mutex_unlock(&mesh->mutex); - return NULL; - } - - meshlink_channel_t *channel = xzalloc(sizeof(*channel)); - channel->node = n; - channel->receive_cb = cb; - - if(data && !len) { - channel->priv = (void *)data; - } - - channel->c = utcp_connect_ex(n->utcp, port, channel_recv, channel, flags); - - pthread_mutex_unlock(&mesh->mutex); - - if(!channel->c) { - meshlink_errno = errno == ENOMEM ? MESHLINK_ENOMEM : MESHLINK_EINTERNAL; - free(channel); - return NULL; - } - - return channel; -} - -meshlink_channel_t *meshlink_channel_open(meshlink_handle_t *mesh, meshlink_node_t *node, uint16_t port, meshlink_channel_receive_cb_t cb, const void *data, size_t len) { - logger(mesh, MESHLINK_DEBUG, "meshlink_channel_open_ex(%s, %u, %p, %p, %zu)", node ? node->name : "(null)", port, (void *)(intptr_t)cb, data, len); - - return meshlink_channel_open_ex(mesh, node, port, cb, data, len, MESHLINK_CHANNEL_TCP); -} - -void meshlink_channel_shutdown(meshlink_handle_t *mesh, meshlink_channel_t *channel, int direction) { - logger(mesh, MESHLINK_DEBUG, "meshlink_channel_shutdown(%p, %d)", (void *)channel, direction); - - if(!mesh || !channel) { - meshlink_errno = MESHLINK_EINVAL; - return; - } - - if(pthread_mutex_lock(&mesh->mutex) != 0) { - abort(); - } - - utcp_shutdown(channel->c, direction); - pthread_mutex_unlock(&mesh->mutex); -} - -void meshlink_channel_close(meshlink_handle_t *mesh, meshlink_channel_t *channel) { - logger(mesh, MESHLINK_DEBUG, "meshlink_channel_close(%p)", (void *)channel); - - if(!mesh || !channel) { - meshlink_errno = MESHLINK_EINVAL; - return; - } - - if(pthread_mutex_lock(&mesh->mutex) != 0) { - abort(); - } - - if(channel->c) { - utcp_close(channel->c); - channel->c = NULL; - - /* Clean up any outstanding AIO buffers. */ - aio_abort(mesh, channel, &channel->aio_send); - aio_abort(mesh, channel, &channel->aio_receive); - } - - if(!channel->in_callback) { - free(channel); - } - - pthread_mutex_unlock(&mesh->mutex); -} - -void meshlink_channel_abort(meshlink_handle_t *mesh, meshlink_channel_t *channel) { - logger(mesh, MESHLINK_DEBUG, "meshlink_channel_abort(%p)", (void *)channel); - - if(!mesh || !channel) { - meshlink_errno = MESHLINK_EINVAL; - return; - } - - if(pthread_mutex_lock(&mesh->mutex) != 0) { - abort(); - } - - if(channel->c) { - utcp_abort(channel->c); - channel->c = NULL; - - /* Clean up any outstanding AIO buffers. */ - aio_abort(mesh, channel, &channel->aio_send); - aio_abort(mesh, channel, &channel->aio_receive); - } - - if(!channel->in_callback) { - free(channel); - } - - pthread_mutex_unlock(&mesh->mutex); -} - -ssize_t meshlink_channel_send(meshlink_handle_t *mesh, meshlink_channel_t *channel, const void *data, size_t len) { - logger(mesh, MESHLINK_DEBUG, "meshlink_channel_send(%p, %p, %zu)", (void *)channel, data, len); - - if(!mesh || !channel) { - meshlink_errno = MESHLINK_EINVAL; - return -1; - } - - if(!len) { - return 0; - } - - if(!data) { - meshlink_errno = MESHLINK_EINVAL; - return -1; - } - - // TODO: more finegrained locking. - // Ideally we want to put the data into the UTCP connection's send buffer. - // Then, preferably only if there is room in the receiver window, - // kick the meshlink thread to go send packets. - - ssize_t retval; - - if(pthread_mutex_lock(&mesh->mutex) != 0) { - abort(); - } - - /* Disallow direct calls to utcp_send() while we still have AIO active. */ - if(channel->aio_send) { - retval = 0; - } else { - retval = utcp_send(channel->c, data, len); - } - - pthread_mutex_unlock(&mesh->mutex); - - if(retval < 0) { - meshlink_errno = MESHLINK_ENETWORK; - } - - return retval; -} - -bool meshlink_channel_aio_send(meshlink_handle_t *mesh, meshlink_channel_t *channel, const void *data, size_t len, meshlink_aio_cb_t cb, void *priv) { - logger(mesh, MESHLINK_DEBUG, "meshlink_channel_aio_send(%p, %p, %zu, %p, %p)", (void *)channel, data, len, (void *)(intptr_t)cb, priv); - - if(!mesh || !channel) { - meshlink_errno = MESHLINK_EINVAL; - return false; - } - - if(!len || !data) { - meshlink_errno = MESHLINK_EINVAL; - return false; - } - - meshlink_aio_buffer_t *aio = xzalloc(sizeof(*aio)); - aio->data = data; - aio->len = len; - aio->cb.buffer = cb; - aio->priv = priv; - - if(pthread_mutex_lock(&mesh->mutex) != 0) { - abort(); - } - - /* Append the AIO buffer descriptor to the end of the chain */ - meshlink_aio_buffer_t **p = &channel->aio_send; - - while(*p) { - p = &(*p)->next; - } - - *p = aio; - - /* Ensure the poll callback is set, and call it right now to push data if possible */ - utcp_set_poll_cb(channel->c, channel_poll); - size_t todo = MIN(len, utcp_get_rcvbuf_free(channel->c)); - - if(todo) { - channel_poll(channel->c, todo); - } - - pthread_mutex_unlock(&mesh->mutex); - - return true; -} - -bool meshlink_channel_aio_fd_send(meshlink_handle_t *mesh, meshlink_channel_t *channel, int fd, size_t len, meshlink_aio_fd_cb_t cb, void *priv) { - logger(mesh, MESHLINK_DEBUG, "meshlink_channel_aio_fd_send(%p, %d, %zu, %p, %p)", (void *)channel, fd, len, (void *)(intptr_t)cb, priv); - - if(!mesh || !channel) { - meshlink_errno = MESHLINK_EINVAL; - return false; - } - - if(!len || fd == -1) { - meshlink_errno = MESHLINK_EINVAL; - return false; - } - - meshlink_aio_buffer_t *aio = xzalloc(sizeof(*aio)); - aio->fd = fd; - aio->len = len; - aio->cb.fd = cb; - aio->priv = priv; - - if(pthread_mutex_lock(&mesh->mutex) != 0) { - abort(); - } - - /* Append the AIO buffer descriptor to the end of the chain */ - meshlink_aio_buffer_t **p = &channel->aio_send; - - while(*p) { - p = &(*p)->next; - } - - *p = aio; - - /* Ensure the poll callback is set, and call it right now to push data if possible */ - utcp_set_poll_cb(channel->c, channel_poll); - size_t left = utcp_get_rcvbuf_free(channel->c); - - if(left) { - channel_poll(channel->c, left); - } - - pthread_mutex_unlock(&mesh->mutex); - - return true; -} - -bool meshlink_channel_aio_receive(meshlink_handle_t *mesh, meshlink_channel_t *channel, const void *data, size_t len, meshlink_aio_cb_t cb, void *priv) { - logger(mesh, MESHLINK_DEBUG, "meshlink_channel_aio_receive(%p, %p, %zu, %p, %p)", (void *)channel, data, len, (void *)(intptr_t)cb, priv); - - if(!mesh || !channel) { - meshlink_errno = MESHLINK_EINVAL; - return false; - } - - if(!len || !data) { - meshlink_errno = MESHLINK_EINVAL; - return false; - } - - meshlink_aio_buffer_t *aio = xzalloc(sizeof(*aio)); - aio->data = data; - aio->len = len; - aio->cb.buffer = cb; - aio->priv = priv; - - if(pthread_mutex_lock(&mesh->mutex) != 0) { - abort(); - } - - /* Append the AIO buffer descriptor to the end of the chain */ - meshlink_aio_buffer_t **p = &channel->aio_receive; - - while(*p) { - p = &(*p)->next; - } - - *p = aio; - - pthread_mutex_unlock(&mesh->mutex); - - return true; -} - -bool meshlink_channel_aio_fd_receive(meshlink_handle_t *mesh, meshlink_channel_t *channel, int fd, size_t len, meshlink_aio_fd_cb_t cb, void *priv) { - logger(mesh, MESHLINK_DEBUG, "meshlink_channel_aio_fd_receive(%p, %d, %zu, %p, %p)", (void *)channel, fd, len, (void *)(intptr_t)cb, priv); - - if(!mesh || !channel) { - meshlink_errno = MESHLINK_EINVAL; - return false; - } - - if(!len || fd == -1) { - meshlink_errno = MESHLINK_EINVAL; - return false; - } - - meshlink_aio_buffer_t *aio = xzalloc(sizeof(*aio)); - aio->fd = fd; - aio->len = len; - aio->cb.fd = cb; - aio->priv = priv; - - if(pthread_mutex_lock(&mesh->mutex) != 0) { - abort(); - } - - /* Append the AIO buffer descriptor to the end of the chain */ - meshlink_aio_buffer_t **p = &channel->aio_receive; - - while(*p) { - p = &(*p)->next; - } - - *p = aio; - - pthread_mutex_unlock(&mesh->mutex); - - return true; -} - -uint32_t meshlink_channel_get_flags(meshlink_handle_t *mesh, meshlink_channel_t *channel) { - if(!mesh || !channel) { - meshlink_errno = MESHLINK_EINVAL; - return -1; - } - - return channel->c->flags; -} - -size_t meshlink_channel_get_sendq(meshlink_handle_t *mesh, meshlink_channel_t *channel) { - if(!mesh || !channel) { - meshlink_errno = MESHLINK_EINVAL; - return -1; - } - - return utcp_get_sendq(channel->c); -} - -size_t meshlink_channel_get_recvq(meshlink_handle_t *mesh, meshlink_channel_t *channel) { - if(!mesh || !channel) { - meshlink_errno = MESHLINK_EINVAL; - return -1; - } - - return utcp_get_recvq(channel->c); -} - -size_t meshlink_channel_get_mss(meshlink_handle_t *mesh, meshlink_channel_t *channel) { - if(!mesh || !channel) { - meshlink_errno = MESHLINK_EINVAL; - return -1; - } - - return utcp_get_mss(channel->node->utcp); -} - -void meshlink_set_node_channel_timeout(meshlink_handle_t *mesh, meshlink_node_t *node, int timeout) { - logger(mesh, MESHLINK_DEBUG, "meshlink_set_node_channel_timeout(%s, %d)", node ? node->name : "(null)", timeout); - - if(!mesh || !node) { - meshlink_errno = MESHLINK_EINVAL; - return; - } - - node_t *n = (node_t *)node; - - if(pthread_mutex_lock(&mesh->mutex) != 0) { - abort(); - } - - if(!n->utcp) { - n->utcp = utcp_init(channel_accept, channel_pre_accept, channel_send, n); - utcp_set_mtu(n->utcp, n->mtu - sizeof(meshlink_packethdr_t)); - utcp_set_retransmit_cb(n->utcp, channel_retransmit); - } - - utcp_set_user_timeout(n->utcp, timeout); - - pthread_mutex_unlock(&mesh->mutex); -} - -void update_node_status(meshlink_handle_t *mesh, node_t *n) { - if(n->status.reachable && mesh->channel_accept_cb && !n->utcp) { - n->utcp = utcp_init(channel_accept, channel_pre_accept, channel_send, n); - utcp_set_mtu(n->utcp, n->mtu - sizeof(meshlink_packethdr_t)); - utcp_set_retransmit_cb(n->utcp, channel_retransmit); - } - - if(mesh->node_status_cb) { - mesh->node_status_cb(mesh, (meshlink_node_t *)n, n->status.reachable && !n->status.blacklisted); - } - - if(mesh->node_pmtu_cb) { - mesh->node_pmtu_cb(mesh, (meshlink_node_t *)n, n->minmtu); - } -} - -void update_node_pmtu(meshlink_handle_t *mesh, node_t *n) { - utcp_set_mtu(n->utcp, (n->minmtu > MINMTU ? n->minmtu : MINMTU) - sizeof(meshlink_packethdr_t)); - - if(mesh->node_pmtu_cb && !n->status.blacklisted) { - mesh->node_pmtu_cb(mesh, (meshlink_node_t *)n, n->minmtu); - } -} - -void handle_duplicate_node(meshlink_handle_t *mesh, node_t *n) { - if(!mesh->node_duplicate_cb || n->status.duplicate) { - return; - } - - n->status.duplicate = true; - mesh->node_duplicate_cb(mesh, (meshlink_node_t *)n); -} - -void meshlink_hint_network_change(struct meshlink_handle *mesh) { - logger(mesh, MESHLINK_DEBUG, "meshlink_hint_network_change()"); - - if(!mesh) { - meshlink_errno = MESHLINK_EINVAL; - return; - } - - if(pthread_mutex_lock(&mesh->mutex) != 0) { - abort(); - } - - pthread_mutex_unlock(&mesh->mutex); -} - -void meshlink_set_dev_class_timeouts(meshlink_handle_t *mesh, dev_class_t devclass, int pinginterval, int pingtimeout) { - logger(mesh, MESHLINK_DEBUG, "meshlink_set_dev_class_timeouts(%d, %d, %d)", devclass, pinginterval, pingtimeout); - - if(!mesh || devclass < 0 || devclass >= DEV_CLASS_COUNT) { - meshlink_errno = EINVAL; - return; - } - - if(pinginterval < 1 || pingtimeout < 1 || pingtimeout > pinginterval) { - meshlink_errno = EINVAL; - return; - } - - if(pthread_mutex_lock(&mesh->mutex) != 0) { - abort(); - } - - mesh->dev_class_traits[devclass].pinginterval = pinginterval; - mesh->dev_class_traits[devclass].pingtimeout = pingtimeout; - pthread_mutex_unlock(&mesh->mutex); -} - -void meshlink_set_dev_class_fast_retry_period(meshlink_handle_t *mesh, dev_class_t devclass, int fast_retry_period) { - logger(mesh, MESHLINK_DEBUG, "meshlink_set_dev_class_fast_retry_period(%d, %d)", devclass, fast_retry_period); - - if(!mesh || devclass < 0 || devclass >= DEV_CLASS_COUNT) { - meshlink_errno = EINVAL; - return; - } - - if(fast_retry_period < 0) { - meshlink_errno = EINVAL; - return; - } - - if(pthread_mutex_lock(&mesh->mutex) != 0) { - abort(); - } - - mesh->dev_class_traits[devclass].fast_retry_period = fast_retry_period; - pthread_mutex_unlock(&mesh->mutex); -} - -void meshlink_set_dev_class_maxtimeout(struct meshlink_handle *mesh, dev_class_t devclass, int maxtimeout) { - logger(mesh, MESHLINK_DEBUG, "meshlink_set_dev_class_fast_maxtimeout(%d, %d)", devclass, maxtimeout); - - if(!mesh || devclass < 0 || devclass >= DEV_CLASS_COUNT) { - meshlink_errno = EINVAL; - return; - } - - if(maxtimeout < 0) { - meshlink_errno = EINVAL; - return; - } - - if(pthread_mutex_lock(&mesh->mutex) != 0) { - abort(); - } - - mesh->dev_class_traits[devclass].maxtimeout = maxtimeout; - pthread_mutex_unlock(&mesh->mutex); -} - -void meshlink_reset_timers(struct meshlink_handle *mesh) { - logger(mesh, MESHLINK_DEBUG, "meshlink_reset_timers()"); - - if(!mesh) { - return; - } - - if(pthread_mutex_lock(&mesh->mutex) != 0) { - abort(); - } - - handle_network_change(mesh, true); - - pthread_mutex_unlock(&mesh->mutex); -} - -void meshlink_set_inviter_commits_first(struct meshlink_handle *mesh, bool inviter_commits_first) { - logger(mesh, MESHLINK_DEBUG, "meshlink_set_inviter_commits_first(%d)", inviter_commits_first); - - if(!mesh) { - meshlink_errno = EINVAL; - return; - } - - if(pthread_mutex_lock(&mesh->mutex) != 0) { - abort(); - } - - mesh->inviter_commits_first = inviter_commits_first; - pthread_mutex_unlock(&mesh->mutex); -} - -void meshlink_set_scheduling_granularity(struct meshlink_handle *mesh, long granularity) { - logger(mesh, MESHLINK_DEBUG, "meshlink_set_scheduling_granularity(%ld)", granularity); - - if(!mesh || granularity < 0) { - meshlink_errno = EINVAL; - return; - } - - utcp_set_clock_granularity(granularity); -} - void meshlink_set_storage_policy(struct meshlink_handle *mesh, meshlink_storage_policy_t policy) { logger(mesh, MESHLINK_DEBUG, "meshlink_set_storage_policy(%d)", policy); @@ -3310,7 +2359,6 @@ void call_error_cb(meshlink_handle_t *mesh, meshlink_errno_t cb_errno) { static void __attribute__((constructor)) meshlink_init(void) { crypto_init(); - utcp_set_clock_granularity(10000); } static void __attribute__((destructor)) meshlink_exit(void) {