#include "packmsg.h"
#include "prf.h"
#include "protocol.h"
-#include "route.h"
#include "sockaddr.h"
#include "utils.h"
#include "xalloc.h"
static struct timespec idle(event_loop_t *loop, void *data) {
(void)loop;
- meshlink_handle_t *mesh = data;
+ (void)data;
- if(mesh->peer && mesh->peer->utcp) {
- return utcp_timeout(mesh->peer->utcp);
- } else {
- return (struct timespec) {
- 3600, 0
- };
- }
+ return (struct timespec) {
+ 3600, 0
+ };
}
static bool meshlink_setup(meshlink_handle_t *mesh) {
pthread_mutex_unlock(&mesh->mutex);
}
-void meshlink_set_node_pmtu_cb(meshlink_handle_t *mesh, meshlink_node_pmtu_cb_t cb) {
- logger(mesh, MESHLINK_DEBUG, "meshlink_set_node_pmtu_cb(%p)", (void *)(intptr_t)cb);
-
- if(!mesh) {
- meshlink_errno = MESHLINK_EINVAL;
- return;
- }
-
- if(pthread_mutex_lock(&mesh->mutex) != 0) {
- abort();
- }
-
- mesh->node_pmtu_cb = cb;
- pthread_mutex_unlock(&mesh->mutex);
-}
-
void meshlink_set_node_duplicate_cb(meshlink_handle_t *mesh, meshlink_node_duplicate_cb_t cb) {
logger(mesh, MESHLINK_DEBUG, "meshlink_set_node_duplicate_cb(%p)", (void *)(intptr_t)cb);
pthread_mutex_unlock(&mesh->mutex);
}
-static bool prepare_packet(meshlink_handle_t *mesh, meshlink_node_t *destination, const void *data, size_t len, vpn_packet_t *packet) {
- meshlink_packethdr_t *hdr;
-
- if(len > MAXSIZE - sizeof(*hdr)) {
- meshlink_errno = MESHLINK_EINVAL;
- return false;
- }
-
- node_t *n = (node_t *)destination;
-
- if(n->status.blacklisted) {
- logger(mesh, MESHLINK_ERROR, "Node %s blacklisted, dropping packet\n", n->name);
- meshlink_errno = MESHLINK_EBLACKLISTED;
- return false;
- }
-
- // Prepare the packet
- packet->probe = false;
- packet->tcp = false;
- packet->len = len + sizeof(*hdr);
-
- hdr = (meshlink_packethdr_t *)packet->data;
- memset(hdr, 0, sizeof(*hdr));
- // leave the last byte as 0 to make sure strings are always
- // null-terminated if they are longer than the buffer
- strncpy((char *)hdr->destination, destination->name, sizeof(hdr->destination) - 1);
- strncpy((char *)hdr->source, mesh->self->name, sizeof(hdr->source) - 1);
-
- memcpy(packet->data + sizeof(*hdr), data, len);
-
- return true;
-}
-
-static bool meshlink_send_immediate(meshlink_handle_t *mesh, meshlink_node_t *destination, const void *data, size_t len) {
- assert(mesh);
- assert(destination);
- assert(data);
- assert(len);
-
- // Prepare the packet
- if(!prepare_packet(mesh, destination, data, len, mesh->packet)) {
- return false;
- }
-
- // Send it immediately
- route(mesh, mesh->self, mesh->packet);
-
- return true;
-}
-
bool meshlink_send(meshlink_handle_t *mesh, meshlink_node_t *destination, const void *data, size_t len) {
logger(mesh, MESHLINK_DEBUG, "meshlink_send(%s, %p, %zu)", destination ? destination->name : "(null)", data, len);
return true;
}
- if(!data) {
+ if(!data || len > MTU) {
meshlink_errno = MESHLINK_EINVAL;
return false;
}
return false;
}
- if(!prepare_packet(mesh, destination, data, len, packet)) {
- free(packet);
- return false;
- }
+ packet->len = len;
+ memcpy(packet->data, data, len);
// Queue it
if(!meshlink_queue_push(&mesh->outpacketqueue, packet)) {
for(vpn_packet_t *packet; (packet = meshlink_queue_pop(&mesh->outpacketqueue));) {
logger(mesh, MESHLINK_DEBUG, "Removing packet of %d bytes from packet queue", packet->len);
- mesh->self->in_packets++;
- mesh->self->in_bytes += packet->len;
- route(mesh, mesh->self, packet);
+ send_raw_packet(mesh, mesh->peer->connection, packet);
free(packet);
}
}
-ssize_t meshlink_get_pmtu(meshlink_handle_t *mesh, meshlink_node_t *destination) {
- if(!mesh || !destination) {
- meshlink_errno = MESHLINK_EINVAL;
- return -1;
- }
-
- if(pthread_mutex_lock(&mesh->mutex) != 0) {
- abort();
- }
-
- node_t *n = (node_t *)destination;
-
- if(!n->status.reachable) {
- pthread_mutex_unlock(&mesh->mutex);
- return 0;
-
- } else if(n->mtuprobes > 30 && n->minmtu) {
- pthread_mutex_unlock(&mesh->mutex);
- return n->minmtu;
- } else {
- pthread_mutex_unlock(&mesh->mutex);
- return MTU;
- }
-}
-
char *meshlink_get_fingerprint(meshlink_handle_t *mesh, meshlink_node_t *node) {
if(!mesh || !node) {
meshlink_errno = MESHLINK_EINVAL;
// @TODO do we want to fire off a connection attempt right away?
}
-static bool channel_pre_accept(struct utcp *utcp, uint16_t port) {
- (void)port;
- node_t *n = utcp->priv;
- meshlink_handle_t *mesh = n->mesh;
-
- if(mesh->channel_accept_cb && mesh->channel_listen_cb) {
- return mesh->channel_listen_cb(mesh, (meshlink_node_t *)n, port);
- } else {
- return mesh->channel_accept_cb;
- }
-}
-
-/* Finish one AIO buffer, return true if the channel is still open. */
-static bool aio_finish_one(meshlink_handle_t *mesh, meshlink_channel_t *channel, meshlink_aio_buffer_t **head) {
- meshlink_aio_buffer_t *aio = *head;
- *head = aio->next;
-
- if(channel->c) {
- channel->in_callback = true;
-
- if(aio->data) {
- if(aio->cb.buffer) {
- aio->cb.buffer(mesh, channel, aio->data, aio->done, aio->priv);
- }
- } else {
- if(aio->cb.fd) {
- aio->cb.fd(mesh, channel, aio->fd, aio->done, aio->priv);
- }
- }
-
- channel->in_callback = false;
-
- if(!channel->c) {
- free(aio);
- free(channel);
- return false;
- }
- }
-
- free(aio);
- return true;
-}
-
-/* Finish all AIO buffers, return true if the channel is still open. */
-static bool aio_abort(meshlink_handle_t *mesh, meshlink_channel_t *channel, meshlink_aio_buffer_t **head) {
- while(*head) {
- if(!aio_finish_one(mesh, channel, head)) {
- return false;
- }
+void update_node_status(meshlink_handle_t *mesh, node_t *n) {
+ if(mesh->node_status_cb) {
+ mesh->node_status_cb(mesh, (meshlink_node_t *)n, n->status.reachable && !n->status.blacklisted);
}
-
- return true;
}
-static ssize_t channel_recv(struct utcp_connection *connection, const void *data, size_t len) {
- meshlink_channel_t *channel = connection->priv;
-
- if(!channel) {
- abort();
- }
-
- node_t *n = channel->node;
- meshlink_handle_t *mesh = n->mesh;
-
- if(n->status.destroyed) {
- meshlink_channel_close(mesh, channel);
- return len;
- }
-
- const char *p = data;
- size_t left = len;
-
- while(channel->aio_receive) {
- if(!len) {
- /* This receive callback signalled an error, abort all outstanding AIO buffers. */
- if(!aio_abort(mesh, channel, &channel->aio_receive)) {
- return len;
- }
-
- break;
- }
-
- meshlink_aio_buffer_t *aio = channel->aio_receive;
- size_t todo = aio->len - aio->done;
-
- if(todo > left) {
- todo = left;
- }
-
- if(aio->data) {
- memcpy((char *)aio->data + aio->done, p, todo);
- } else {
- ssize_t result = write(aio->fd, p, todo);
-
- if(result <= 0) {
- if(result < 0 && errno == EINTR) {
- continue;
- }
-
- /* Writing to fd failed, cancel just this AIO buffer. */
- logger(mesh, MESHLINK_ERROR, "Writing to AIO fd %d failed: %s", aio->fd, strerror(errno));
-
- if(!aio_finish_one(mesh, channel, &channel->aio_receive)) {
- return len;
- }
-
- continue;
- }
-
- todo = result;
- }
-
- aio->done += todo;
- p += todo;
- left -= todo;
-
- if(aio->done == aio->len) {
- if(!aio_finish_one(mesh, channel, &channel->aio_receive)) {
- return len;
- }
- }
-
- if(!left) {
- return len;
- }
- }
-
- if(channel->receive_cb) {
- channel->receive_cb(mesh, channel, p, left);
+void handle_duplicate_node(meshlink_handle_t *mesh, node_t *n) {
+ if(!mesh->node_duplicate_cb || n->status.duplicate) {
+ return;
}
- return len;
+ n->status.duplicate = true;
+ mesh->node_duplicate_cb(mesh, (meshlink_node_t *)n);
}
-static void channel_accept(struct utcp_connection *utcp_connection, uint16_t port) {
- node_t *n = utcp_connection->utcp->priv;
-
- if(!n) {
- abort();
- }
-
- meshlink_handle_t *mesh = n->mesh;
+void meshlink_hint_network_change(struct meshlink_handle *mesh) {
+ logger(mesh, MESHLINK_DEBUG, "meshlink_hint_network_change()");
- if(!mesh->channel_accept_cb) {
+ if(!mesh) {
+ meshlink_errno = MESHLINK_EINVAL;
return;
}
- meshlink_channel_t *channel = xzalloc(sizeof(*channel));
- channel->node = n;
- channel->c = utcp_connection;
-
- if(mesh->channel_accept_cb(mesh, channel, port, NULL, 0)) {
- utcp_accept(utcp_connection, channel_recv, channel);
- } else {
- free(channel);
+ if(pthread_mutex_lock(&mesh->mutex) != 0) {
+ abort();
}
-}
-
-static void channel_retransmit(struct utcp_connection *utcp_connection) {
- node_t *n = utcp_connection->utcp->priv;
- meshlink_handle_t *mesh = n->mesh;
- if(n->mtuprobes == 31 && n->mtutimeout.cb) {
- timeout_set(&mesh->loop, &n->mtutimeout, &(struct timespec) {
- 0, 0
- });
- }
+ pthread_mutex_unlock(&mesh->mutex);
}
-static ssize_t channel_send(struct utcp *utcp, const void *data, size_t len) {
- node_t *n = utcp->priv;
+void meshlink_set_dev_class_timeouts(meshlink_handle_t *mesh, dev_class_t devclass, int pinginterval, int pingtimeout) {
+ logger(mesh, MESHLINK_DEBUG, "meshlink_set_dev_class_timeouts(%d, %d, %d)", devclass, pinginterval, pingtimeout);
- if(n->status.destroyed) {
- return -1;
+ if(!mesh || devclass < 0 || devclass >= DEV_CLASS_COUNT) {
+ meshlink_errno = EINVAL;
+ return;
}
- meshlink_handle_t *mesh = n->mesh;
- return meshlink_send_immediate(mesh, (meshlink_node_t *)n, data, len) ? (ssize_t)len : -1;
-}
-
-void meshlink_set_channel_receive_cb(meshlink_handle_t *mesh, meshlink_channel_t *channel, meshlink_channel_receive_cb_t cb) {
- logger(mesh, MESHLINK_DEBUG, "meshlink_set_channel_receive_cb(%p, %p)", (void *)channel, (void *)(intptr_t)cb);
-
- if(!mesh || !channel) {
- meshlink_errno = MESHLINK_EINVAL;
+ if(pinginterval < 1 || pingtimeout < 1 || pingtimeout > pinginterval) {
+ meshlink_errno = EINVAL;
return;
}
- channel->receive_cb = cb;
-}
-
-static void channel_receive(meshlink_handle_t *mesh, meshlink_node_t *source, const void *data, size_t len) {
- (void)mesh;
- node_t *n = (node_t *)source;
-
- if(!n->utcp) {
+ if(pthread_mutex_lock(&mesh->mutex) != 0) {
abort();
}
- utcp_recv(n->utcp, data, len);
+ mesh->dev_class_traits[devclass].pinginterval = pinginterval;
+ mesh->dev_class_traits[devclass].pingtimeout = pingtimeout;
+ pthread_mutex_unlock(&mesh->mutex);
}
-static void channel_poll(struct utcp_connection *connection, size_t len) {
- meshlink_channel_t *channel = connection->priv;
-
- if(!channel) {
- abort();
- }
-
- node_t *n = channel->node;
- meshlink_handle_t *mesh = n->mesh;
-
- while(channel->aio_send) {
- if(!len) {
- /* This poll callback signalled an error, abort all outstanding AIO buffers. */
- if(!aio_abort(mesh, channel, &channel->aio_send)) {
- return;
- }
-
- break;
- }
-
- /* We have at least one AIO buffer. Send as much as possible from the buffers. */
- meshlink_aio_buffer_t *aio = channel->aio_send;
- size_t todo = aio->len - aio->done;
- ssize_t sent;
-
- if(todo > len) {
- todo = len;
- }
-
- if(aio->data) {
- sent = utcp_send(connection, (char *)aio->data + aio->done, todo);
- } else {
- /* Limit the amount we read at once to avoid stack overflows */
- if(todo > 65536) {
- todo = 65536;
- }
-
- char buf[todo];
- ssize_t result = read(aio->fd, buf, todo);
-
- if(result > 0) {
- todo = result;
- sent = utcp_send(connection, buf, todo);
- } else {
- if(result < 0 && errno == EINTR) {
- continue;
- }
-
- /* Reading from fd failed, cancel just this AIO buffer. */
- if(result != 0) {
- logger(mesh, MESHLINK_ERROR, "Reading from AIO fd %d failed: %s", aio->fd, strerror(errno));
- }
-
- if(!aio_finish_one(mesh, channel, &channel->aio_send)) {
- return;
- }
-
- continue;
- }
- }
-
- if(sent != (ssize_t)todo) {
- /* Sending failed, abort all outstanding AIO buffers and send a poll callback. */
- if(!aio_abort(mesh, channel, &channel->aio_send)) {
- return;
- }
-
- len = 0;
- break;
- }
-
- aio->done += sent;
- len -= sent;
-
- /* If we didn't finish this buffer, exit early. */
- if(aio->done < aio->len) {
- return;
- }
-
- /* Signal completion of this buffer, and go to the next one. */
- if(!aio_finish_one(mesh, channel, &channel->aio_send)) {
- return;
- }
-
- if(!len) {
- return;
- }
- }
+void meshlink_set_dev_class_fast_retry_period(meshlink_handle_t *mesh, dev_class_t devclass, int fast_retry_period) {
+ logger(mesh, MESHLINK_DEBUG, "meshlink_set_dev_class_fast_retry_period(%d, %d)", devclass, fast_retry_period);
- if(channel->poll_cb) {
- channel->poll_cb(mesh, channel, len);
- } else {
- utcp_set_poll_cb(connection, NULL);
+ if(!mesh || devclass < 0 || devclass >= DEV_CLASS_COUNT) {
+ meshlink_errno = EINVAL;
+ return;
}
-}
-
-void meshlink_set_channel_poll_cb(meshlink_handle_t *mesh, meshlink_channel_t *channel, meshlink_channel_poll_cb_t cb) {
- logger(mesh, MESHLINK_DEBUG, "meshlink_set_channel_poll_cb(%p, %p)", (void *)channel, (void *)(intptr_t)cb);
- if(!mesh || !channel) {
- meshlink_errno = MESHLINK_EINVAL;
+ if(fast_retry_period < 0) {
+ meshlink_errno = EINVAL;
return;
}
abort();
}
- channel->poll_cb = cb;
- utcp_set_poll_cb(channel->c, (cb || channel->aio_send) ? channel_poll : NULL);
+ mesh->dev_class_traits[devclass].fast_retry_period = fast_retry_period;
pthread_mutex_unlock(&mesh->mutex);
}
-void meshlink_set_channel_listen_cb(meshlink_handle_t *mesh, meshlink_channel_listen_cb_t cb) {
- logger(mesh, MESHLINK_DEBUG, "meshlink_set_channel_listen_cb(%p)", (void *)(intptr_t)cb);
+void meshlink_set_dev_class_maxtimeout(struct meshlink_handle *mesh, dev_class_t devclass, int maxtimeout) {
+ logger(mesh, MESHLINK_DEBUG, "meshlink_set_dev_class_fast_maxtimeout(%d, %d)", devclass, maxtimeout);
- if(!mesh) {
- meshlink_errno = MESHLINK_EINVAL;
+ if(!mesh || devclass < 0 || devclass >= DEV_CLASS_COUNT) {
+ meshlink_errno = EINVAL;
+ return;
+ }
+
+ if(maxtimeout < 0) {
+ meshlink_errno = EINVAL;
return;
}
abort();
}
- mesh->channel_listen_cb = cb;
-
+ mesh->dev_class_traits[devclass].maxtimeout = maxtimeout;
pthread_mutex_unlock(&mesh->mutex);
}
-void meshlink_set_channel_accept_cb(meshlink_handle_t *mesh, meshlink_channel_accept_cb_t cb) {
- logger(mesh, MESHLINK_DEBUG, "meshlink_set_channel_accept_cb(%p)", (void *)(intptr_t)cb);
+void meshlink_reset_timers(struct meshlink_handle *mesh) {
+ logger(mesh, MESHLINK_DEBUG, "meshlink_reset_timers()");
if(!mesh) {
- meshlink_errno = MESHLINK_EINVAL;
return;
}
abort();
}
- mesh->channel_accept_cb = cb;
- mesh->receive_cb = channel_receive;
-
- if(mesh->peer) {
- mesh->peer->utcp = utcp_init(channel_accept, channel_pre_accept, channel_send, mesh->peer);
- utcp_set_mtu(mesh->peer->utcp, mesh->peer->mtu - sizeof(meshlink_packethdr_t));
- utcp_set_retransmit_cb(mesh->peer->utcp, channel_retransmit);
- }
+ handle_network_change(mesh, true);
pthread_mutex_unlock(&mesh->mutex);
}
-void meshlink_set_channel_sndbuf(meshlink_handle_t *mesh, meshlink_channel_t *channel, size_t size) {
- logger(mesh, MESHLINK_DEBUG, "meshlink_set_channel_sndbuf(%p, %zu)", (void *)channel, size);
-
- meshlink_set_channel_sndbuf_storage(mesh, channel, NULL, size);
-}
-
-void meshlink_set_channel_rcvbuf(meshlink_handle_t *mesh, meshlink_channel_t *channel, size_t size) {
- logger(mesh, MESHLINK_DEBUG, "meshlink_set_channel_rcvbuf(%p, %zu)", (void *)channel, size);
-
- meshlink_set_channel_rcvbuf_storage(mesh, channel, NULL, size);
-}
-
-void meshlink_set_channel_sndbuf_storage(meshlink_handle_t *mesh, meshlink_channel_t *channel, void *buf, size_t size) {
- logger(mesh, MESHLINK_DEBUG, "meshlink_set_channel_sndbuf_storage(%p, %p, %zu)", (void *)channel, buf, size);
+void meshlink_set_inviter_commits_first(struct meshlink_handle *mesh, bool inviter_commits_first) {
+ logger(mesh, MESHLINK_DEBUG, "meshlink_set_inviter_commits_first(%d)", inviter_commits_first);
- if(!mesh || !channel) {
- meshlink_errno = MESHLINK_EINVAL;
+ if(!mesh) {
+ meshlink_errno = EINVAL;
return;
}
abort();
}
- utcp_set_sndbuf(channel->c, buf, size);
+ mesh->inviter_commits_first = inviter_commits_first;
pthread_mutex_unlock(&mesh->mutex);
}
-void meshlink_set_channel_rcvbuf_storage(meshlink_handle_t *mesh, meshlink_channel_t *channel, void *buf, size_t size) {
- logger(mesh, MESHLINK_DEBUG, "meshlink_set_channel_rcvbuf_storage(%p, %p, %zu)", (void *)channel, buf, size);
-
- if(!mesh || !channel) {
- meshlink_errno = MESHLINK_EINVAL;
- return;
- }
-
- if(pthread_mutex_lock(&mesh->mutex) != 0) {
- abort();
- }
-
- utcp_set_rcvbuf(channel->c, buf, size);
- pthread_mutex_unlock(&mesh->mutex);
-}
-
-void meshlink_set_channel_flags(meshlink_handle_t *mesh, meshlink_channel_t *channel, uint32_t flags) {
- logger(mesh, MESHLINK_DEBUG, "meshlink_set_channel_flags(%p, %u)", (void *)channel, flags);
-
- if(!mesh || !channel) {
- meshlink_errno = MESHLINK_EINVAL;
- return;
- }
-
- if(pthread_mutex_lock(&mesh->mutex) != 0) {
- abort();
- }
-
- utcp_set_flags(channel->c, flags);
- pthread_mutex_unlock(&mesh->mutex);
-}
-
-meshlink_channel_t *meshlink_channel_open_ex(meshlink_handle_t *mesh, meshlink_node_t *node, uint16_t port, meshlink_channel_receive_cb_t cb, const void *data, size_t len, uint32_t flags) {
- logger(mesh, MESHLINK_DEBUG, "meshlink_channel_open_ex(%s, %u, %p, %p, %zu, %u)", node ? node->name : "(null)", port, (void *)(intptr_t)cb, data, len, flags);
-
- if(data && len) {
- abort(); // TODO: handle non-NULL data
- }
-
- if(!mesh || !node) {
- meshlink_errno = MESHLINK_EINVAL;
- return NULL;
- }
-
- if(pthread_mutex_lock(&mesh->mutex) != 0) {
- abort();
- }
-
- node_t *n = (node_t *)node;
-
- if(!n->utcp) {
- n->utcp = utcp_init(channel_accept, channel_pre_accept, channel_send, n);
- utcp_set_mtu(n->utcp, n->mtu - sizeof(meshlink_packethdr_t));
- utcp_set_retransmit_cb(n->utcp, channel_retransmit);
- mesh->receive_cb = channel_receive;
-
- if(!n->utcp) {
- meshlink_errno = errno == ENOMEM ? MESHLINK_ENOMEM : MESHLINK_EINTERNAL;
- pthread_mutex_unlock(&mesh->mutex);
- return NULL;
- }
- }
-
- if(n->status.blacklisted) {
- logger(mesh, MESHLINK_ERROR, "Cannot open a channel with blacklisted node\n");
- meshlink_errno = MESHLINK_EBLACKLISTED;
- pthread_mutex_unlock(&mesh->mutex);
- return NULL;
- }
-
- meshlink_channel_t *channel = xzalloc(sizeof(*channel));
- channel->node = n;
- channel->receive_cb = cb;
-
- if(data && !len) {
- channel->priv = (void *)data;
- }
-
- channel->c = utcp_connect_ex(n->utcp, port, channel_recv, channel, flags);
-
- pthread_mutex_unlock(&mesh->mutex);
-
- if(!channel->c) {
- meshlink_errno = errno == ENOMEM ? MESHLINK_ENOMEM : MESHLINK_EINTERNAL;
- free(channel);
- return NULL;
- }
-
- return channel;
-}
-
-meshlink_channel_t *meshlink_channel_open(meshlink_handle_t *mesh, meshlink_node_t *node, uint16_t port, meshlink_channel_receive_cb_t cb, const void *data, size_t len) {
- logger(mesh, MESHLINK_DEBUG, "meshlink_channel_open_ex(%s, %u, %p, %p, %zu)", node ? node->name : "(null)", port, (void *)(intptr_t)cb, data, len);
-
- return meshlink_channel_open_ex(mesh, node, port, cb, data, len, MESHLINK_CHANNEL_TCP);
-}
-
-void meshlink_channel_shutdown(meshlink_handle_t *mesh, meshlink_channel_t *channel, int direction) {
- logger(mesh, MESHLINK_DEBUG, "meshlink_channel_shutdown(%p, %d)", (void *)channel, direction);
-
- if(!mesh || !channel) {
- meshlink_errno = MESHLINK_EINVAL;
- return;
- }
-
- if(pthread_mutex_lock(&mesh->mutex) != 0) {
- abort();
- }
-
- utcp_shutdown(channel->c, direction);
- pthread_mutex_unlock(&mesh->mutex);
-}
-
-void meshlink_channel_close(meshlink_handle_t *mesh, meshlink_channel_t *channel) {
- logger(mesh, MESHLINK_DEBUG, "meshlink_channel_close(%p)", (void *)channel);
-
- if(!mesh || !channel) {
- meshlink_errno = MESHLINK_EINVAL;
- return;
- }
-
- if(pthread_mutex_lock(&mesh->mutex) != 0) {
- abort();
- }
-
- if(channel->c) {
- utcp_close(channel->c);
- channel->c = NULL;
-
- /* Clean up any outstanding AIO buffers. */
- aio_abort(mesh, channel, &channel->aio_send);
- aio_abort(mesh, channel, &channel->aio_receive);
- }
-
- if(!channel->in_callback) {
- free(channel);
- }
-
- pthread_mutex_unlock(&mesh->mutex);
-}
-
-void meshlink_channel_abort(meshlink_handle_t *mesh, meshlink_channel_t *channel) {
- logger(mesh, MESHLINK_DEBUG, "meshlink_channel_abort(%p)", (void *)channel);
-
- if(!mesh || !channel) {
- meshlink_errno = MESHLINK_EINVAL;
- return;
- }
-
- if(pthread_mutex_lock(&mesh->mutex) != 0) {
- abort();
- }
-
- if(channel->c) {
- utcp_abort(channel->c);
- channel->c = NULL;
-
- /* Clean up any outstanding AIO buffers. */
- aio_abort(mesh, channel, &channel->aio_send);
- aio_abort(mesh, channel, &channel->aio_receive);
- }
-
- if(!channel->in_callback) {
- free(channel);
- }
-
- pthread_mutex_unlock(&mesh->mutex);
-}
-
-ssize_t meshlink_channel_send(meshlink_handle_t *mesh, meshlink_channel_t *channel, const void *data, size_t len) {
- logger(mesh, MESHLINK_DEBUG, "meshlink_channel_send(%p, %p, %zu)", (void *)channel, data, len);
-
- if(!mesh || !channel) {
- meshlink_errno = MESHLINK_EINVAL;
- return -1;
- }
-
- if(!len) {
- return 0;
- }
-
- if(!data) {
- meshlink_errno = MESHLINK_EINVAL;
- return -1;
- }
-
- // TODO: more finegrained locking.
- // Ideally we want to put the data into the UTCP connection's send buffer.
- // Then, preferably only if there is room in the receiver window,
- // kick the meshlink thread to go send packets.
-
- ssize_t retval;
-
- if(pthread_mutex_lock(&mesh->mutex) != 0) {
- abort();
- }
-
- /* Disallow direct calls to utcp_send() while we still have AIO active. */
- if(channel->aio_send) {
- retval = 0;
- } else {
- retval = utcp_send(channel->c, data, len);
- }
-
- pthread_mutex_unlock(&mesh->mutex);
-
- if(retval < 0) {
- meshlink_errno = MESHLINK_ENETWORK;
- }
-
- return retval;
-}
-
-bool meshlink_channel_aio_send(meshlink_handle_t *mesh, meshlink_channel_t *channel, const void *data, size_t len, meshlink_aio_cb_t cb, void *priv) {
- logger(mesh, MESHLINK_DEBUG, "meshlink_channel_aio_send(%p, %p, %zu, %p, %p)", (void *)channel, data, len, (void *)(intptr_t)cb, priv);
-
- if(!mesh || !channel) {
- meshlink_errno = MESHLINK_EINVAL;
- return false;
- }
-
- if(!len || !data) {
- meshlink_errno = MESHLINK_EINVAL;
- return false;
- }
-
- meshlink_aio_buffer_t *aio = xzalloc(sizeof(*aio));
- aio->data = data;
- aio->len = len;
- aio->cb.buffer = cb;
- aio->priv = priv;
-
- if(pthread_mutex_lock(&mesh->mutex) != 0) {
- abort();
- }
-
- /* Append the AIO buffer descriptor to the end of the chain */
- meshlink_aio_buffer_t **p = &channel->aio_send;
-
- while(*p) {
- p = &(*p)->next;
- }
-
- *p = aio;
-
- /* Ensure the poll callback is set, and call it right now to push data if possible */
- utcp_set_poll_cb(channel->c, channel_poll);
- size_t todo = MIN(len, utcp_get_rcvbuf_free(channel->c));
-
- if(todo) {
- channel_poll(channel->c, todo);
- }
-
- pthread_mutex_unlock(&mesh->mutex);
-
- return true;
-}
-
-bool meshlink_channel_aio_fd_send(meshlink_handle_t *mesh, meshlink_channel_t *channel, int fd, size_t len, meshlink_aio_fd_cb_t cb, void *priv) {
- logger(mesh, MESHLINK_DEBUG, "meshlink_channel_aio_fd_send(%p, %d, %zu, %p, %p)", (void *)channel, fd, len, (void *)(intptr_t)cb, priv);
-
- if(!mesh || !channel) {
- meshlink_errno = MESHLINK_EINVAL;
- return false;
- }
-
- if(!len || fd == -1) {
- meshlink_errno = MESHLINK_EINVAL;
- return false;
- }
-
- meshlink_aio_buffer_t *aio = xzalloc(sizeof(*aio));
- aio->fd = fd;
- aio->len = len;
- aio->cb.fd = cb;
- aio->priv = priv;
-
- if(pthread_mutex_lock(&mesh->mutex) != 0) {
- abort();
- }
-
- /* Append the AIO buffer descriptor to the end of the chain */
- meshlink_aio_buffer_t **p = &channel->aio_send;
-
- while(*p) {
- p = &(*p)->next;
- }
-
- *p = aio;
-
- /* Ensure the poll callback is set, and call it right now to push data if possible */
- utcp_set_poll_cb(channel->c, channel_poll);
- size_t left = utcp_get_rcvbuf_free(channel->c);
-
- if(left) {
- channel_poll(channel->c, left);
- }
-
- pthread_mutex_unlock(&mesh->mutex);
-
- return true;
-}
-
-bool meshlink_channel_aio_receive(meshlink_handle_t *mesh, meshlink_channel_t *channel, const void *data, size_t len, meshlink_aio_cb_t cb, void *priv) {
- logger(mesh, MESHLINK_DEBUG, "meshlink_channel_aio_receive(%p, %p, %zu, %p, %p)", (void *)channel, data, len, (void *)(intptr_t)cb, priv);
-
- if(!mesh || !channel) {
- meshlink_errno = MESHLINK_EINVAL;
- return false;
- }
-
- if(!len || !data) {
- meshlink_errno = MESHLINK_EINVAL;
- return false;
- }
-
- meshlink_aio_buffer_t *aio = xzalloc(sizeof(*aio));
- aio->data = data;
- aio->len = len;
- aio->cb.buffer = cb;
- aio->priv = priv;
-
- if(pthread_mutex_lock(&mesh->mutex) != 0) {
- abort();
- }
-
- /* Append the AIO buffer descriptor to the end of the chain */
- meshlink_aio_buffer_t **p = &channel->aio_receive;
-
- while(*p) {
- p = &(*p)->next;
- }
-
- *p = aio;
-
- pthread_mutex_unlock(&mesh->mutex);
-
- return true;
-}
-
-bool meshlink_channel_aio_fd_receive(meshlink_handle_t *mesh, meshlink_channel_t *channel, int fd, size_t len, meshlink_aio_fd_cb_t cb, void *priv) {
- logger(mesh, MESHLINK_DEBUG, "meshlink_channel_aio_fd_receive(%p, %d, %zu, %p, %p)", (void *)channel, fd, len, (void *)(intptr_t)cb, priv);
-
- if(!mesh || !channel) {
- meshlink_errno = MESHLINK_EINVAL;
- return false;
- }
-
- if(!len || fd == -1) {
- meshlink_errno = MESHLINK_EINVAL;
- return false;
- }
-
- meshlink_aio_buffer_t *aio = xzalloc(sizeof(*aio));
- aio->fd = fd;
- aio->len = len;
- aio->cb.fd = cb;
- aio->priv = priv;
-
- if(pthread_mutex_lock(&mesh->mutex) != 0) {
- abort();
- }
-
- /* Append the AIO buffer descriptor to the end of the chain */
- meshlink_aio_buffer_t **p = &channel->aio_receive;
-
- while(*p) {
- p = &(*p)->next;
- }
-
- *p = aio;
-
- pthread_mutex_unlock(&mesh->mutex);
-
- return true;
-}
-
-uint32_t meshlink_channel_get_flags(meshlink_handle_t *mesh, meshlink_channel_t *channel) {
- if(!mesh || !channel) {
- meshlink_errno = MESHLINK_EINVAL;
- return -1;
- }
-
- return channel->c->flags;
-}
-
-size_t meshlink_channel_get_sendq(meshlink_handle_t *mesh, meshlink_channel_t *channel) {
- if(!mesh || !channel) {
- meshlink_errno = MESHLINK_EINVAL;
- return -1;
- }
-
- return utcp_get_sendq(channel->c);
-}
-
-size_t meshlink_channel_get_recvq(meshlink_handle_t *mesh, meshlink_channel_t *channel) {
- if(!mesh || !channel) {
- meshlink_errno = MESHLINK_EINVAL;
- return -1;
- }
-
- return utcp_get_recvq(channel->c);
-}
-
-size_t meshlink_channel_get_mss(meshlink_handle_t *mesh, meshlink_channel_t *channel) {
- if(!mesh || !channel) {
- meshlink_errno = MESHLINK_EINVAL;
- return -1;
- }
-
- return utcp_get_mss(channel->node->utcp);
-}
-
-void meshlink_set_node_channel_timeout(meshlink_handle_t *mesh, meshlink_node_t *node, int timeout) {
- logger(mesh, MESHLINK_DEBUG, "meshlink_set_node_channel_timeout(%s, %d)", node ? node->name : "(null)", timeout);
-
- if(!mesh || !node) {
- meshlink_errno = MESHLINK_EINVAL;
- return;
- }
-
- node_t *n = (node_t *)node;
-
- if(pthread_mutex_lock(&mesh->mutex) != 0) {
- abort();
- }
-
- if(!n->utcp) {
- n->utcp = utcp_init(channel_accept, channel_pre_accept, channel_send, n);
- utcp_set_mtu(n->utcp, n->mtu - sizeof(meshlink_packethdr_t));
- utcp_set_retransmit_cb(n->utcp, channel_retransmit);
- }
-
- utcp_set_user_timeout(n->utcp, timeout);
-
- pthread_mutex_unlock(&mesh->mutex);
-}
-
-void update_node_status(meshlink_handle_t *mesh, node_t *n) {
- if(n->status.reachable && mesh->channel_accept_cb && !n->utcp) {
- n->utcp = utcp_init(channel_accept, channel_pre_accept, channel_send, n);
- utcp_set_mtu(n->utcp, n->mtu - sizeof(meshlink_packethdr_t));
- utcp_set_retransmit_cb(n->utcp, channel_retransmit);
- }
-
- if(mesh->node_status_cb) {
- mesh->node_status_cb(mesh, (meshlink_node_t *)n, n->status.reachable && !n->status.blacklisted);
- }
-
- if(mesh->node_pmtu_cb) {
- mesh->node_pmtu_cb(mesh, (meshlink_node_t *)n, n->minmtu);
- }
-}
-
-void update_node_pmtu(meshlink_handle_t *mesh, node_t *n) {
- utcp_set_mtu(n->utcp, (n->minmtu > MINMTU ? n->minmtu : MINMTU) - sizeof(meshlink_packethdr_t));
-
- if(mesh->node_pmtu_cb && !n->status.blacklisted) {
- mesh->node_pmtu_cb(mesh, (meshlink_node_t *)n, n->minmtu);
- }
-}
-
-void handle_duplicate_node(meshlink_handle_t *mesh, node_t *n) {
- if(!mesh->node_duplicate_cb || n->status.duplicate) {
- return;
- }
-
- n->status.duplicate = true;
- mesh->node_duplicate_cb(mesh, (meshlink_node_t *)n);
-}
-
-void meshlink_hint_network_change(struct meshlink_handle *mesh) {
- logger(mesh, MESHLINK_DEBUG, "meshlink_hint_network_change()");
-
- if(!mesh) {
- meshlink_errno = MESHLINK_EINVAL;
- return;
- }
-
- if(pthread_mutex_lock(&mesh->mutex) != 0) {
- abort();
- }
-
- pthread_mutex_unlock(&mesh->mutex);
-}
-
-void meshlink_set_dev_class_timeouts(meshlink_handle_t *mesh, dev_class_t devclass, int pinginterval, int pingtimeout) {
- logger(mesh, MESHLINK_DEBUG, "meshlink_set_dev_class_timeouts(%d, %d, %d)", devclass, pinginterval, pingtimeout);
-
- if(!mesh || devclass < 0 || devclass >= DEV_CLASS_COUNT) {
- meshlink_errno = EINVAL;
- return;
- }
-
- if(pinginterval < 1 || pingtimeout < 1 || pingtimeout > pinginterval) {
- meshlink_errno = EINVAL;
- return;
- }
-
- if(pthread_mutex_lock(&mesh->mutex) != 0) {
- abort();
- }
-
- mesh->dev_class_traits[devclass].pinginterval = pinginterval;
- mesh->dev_class_traits[devclass].pingtimeout = pingtimeout;
- pthread_mutex_unlock(&mesh->mutex);
-}
-
-void meshlink_set_dev_class_fast_retry_period(meshlink_handle_t *mesh, dev_class_t devclass, int fast_retry_period) {
- logger(mesh, MESHLINK_DEBUG, "meshlink_set_dev_class_fast_retry_period(%d, %d)", devclass, fast_retry_period);
-
- if(!mesh || devclass < 0 || devclass >= DEV_CLASS_COUNT) {
- meshlink_errno = EINVAL;
- return;
- }
-
- if(fast_retry_period < 0) {
- meshlink_errno = EINVAL;
- return;
- }
-
- if(pthread_mutex_lock(&mesh->mutex) != 0) {
- abort();
- }
-
- mesh->dev_class_traits[devclass].fast_retry_period = fast_retry_period;
- pthread_mutex_unlock(&mesh->mutex);
-}
-
-void meshlink_set_dev_class_maxtimeout(struct meshlink_handle *mesh, dev_class_t devclass, int maxtimeout) {
- logger(mesh, MESHLINK_DEBUG, "meshlink_set_dev_class_fast_maxtimeout(%d, %d)", devclass, maxtimeout);
-
- if(!mesh || devclass < 0 || devclass >= DEV_CLASS_COUNT) {
- meshlink_errno = EINVAL;
- return;
- }
-
- if(maxtimeout < 0) {
- meshlink_errno = EINVAL;
- return;
- }
-
- if(pthread_mutex_lock(&mesh->mutex) != 0) {
- abort();
- }
-
- mesh->dev_class_traits[devclass].maxtimeout = maxtimeout;
- pthread_mutex_unlock(&mesh->mutex);
-}
-
-void meshlink_reset_timers(struct meshlink_handle *mesh) {
- logger(mesh, MESHLINK_DEBUG, "meshlink_reset_timers()");
-
- if(!mesh) {
- return;
- }
-
- if(pthread_mutex_lock(&mesh->mutex) != 0) {
- abort();
- }
-
- handle_network_change(mesh, true);
-
- pthread_mutex_unlock(&mesh->mutex);
-}
-
-void meshlink_set_inviter_commits_first(struct meshlink_handle *mesh, bool inviter_commits_first) {
- logger(mesh, MESHLINK_DEBUG, "meshlink_set_inviter_commits_first(%d)", inviter_commits_first);
-
- if(!mesh) {
- meshlink_errno = EINVAL;
- return;
- }
-
- if(pthread_mutex_lock(&mesh->mutex) != 0) {
- abort();
- }
-
- mesh->inviter_commits_first = inviter_commits_first;
- pthread_mutex_unlock(&mesh->mutex);
-}
-
-void meshlink_set_scheduling_granularity(struct meshlink_handle *mesh, long granularity) {
- logger(mesh, MESHLINK_DEBUG, "meshlink_set_scheduling_granularity(%ld)", granularity);
-
- if(!mesh || granularity < 0) {
- meshlink_errno = EINVAL;
- return;
- }
-
- utcp_set_clock_granularity(granularity);
-}
-
void meshlink_set_storage_policy(struct meshlink_handle *mesh, meshlink_storage_policy_t policy) {
logger(mesh, MESHLINK_DEBUG, "meshlink_set_storage_policy(%d)", policy);
static void __attribute__((constructor)) meshlink_init(void) {
crypto_init();
- utcp_set_clock_granularity(10000);
}
static void __attribute__((destructor)) meshlink_exit(void) {