free(mesh->self->name);
mesh->name = name;
mesh->self->name = xstrdup(name);
- mesh->self->devclass = devclass;
+ mesh->self->devclass = devclass == DEV_CLASS_UNKNOWN ? mesh->devclass : devclass;
// Initialize configuration directory
if(!config_init(mesh, "current")) {
return true;
}
+
static bool sendline(int fd, char *format, ...) {
- static char buffer[4096];
+ char buffer[4096];
char *p = buffer;
int blen = 0;
va_list ap;
return true;
}
-bool meshlink_encrypted_key_rotate(meshlink_handle_t *mesh, const char *new_key, size_t new_keylen) {
- if(!mesh || !new_key || !new_keylen || !*new_key) {
+bool meshlink_encrypted_key_rotate(meshlink_handle_t *mesh, const void *new_key, size_t new_keylen) {
+ if(!mesh || !new_key || !new_keylen) {
logger(mesh, MESHLINK_ERROR, "Invalid arguments given!\n");
meshlink_errno = MESHLINK_EINVAL;
return false;
#ifdef HAVE_SETNS
if(setns(mesh->netns, CLONE_NEWNET) != 0) {
+ pthread_cond_signal(&mesh->cond);
return NULL;
}
#else
+ pthread_cond_signal(&mesh->cond);
return NULL;
#endif // HAVE_SETNS
}
+#if HAVE_CATTA
+
+ if(mesh->discovery) {
+ discovery_start(mesh);
+ }
+
+#endif
+
pthread_mutex_lock(&(mesh->mesh_mutex));
logger(mesh, MESHLINK_DEBUG, "Starting main_loop...\n");
+ pthread_cond_broadcast(&mesh->cond);
main_loop(mesh);
logger(mesh, MESHLINK_DEBUG, "main_loop returned.\n");
pthread_mutex_unlock(&(mesh->mesh_mutex));
+
+#if HAVE_CATTA
+
+ // Stop discovery
+ if(mesh->discovery) {
+ discovery_stop(mesh);
+ }
+
+#endif
+
return NULL;
}
pthread_mutex_lock(&(mesh->mesh_mutex));
+ assert(mesh->self->ecdsa);
+ assert(!memcmp((uint8_t *)mesh->self->ecdsa + 64, (uint8_t *)mesh->private_key + 64, 32));
+
if(mesh->threadstarted) {
logger(mesh, MESHLINK_DEBUG, "thread was already running\n");
pthread_mutex_unlock(&(mesh->mesh_mutex));
return false;
}
+ pthread_cond_wait(&mesh->cond, &mesh->mesh_mutex);
mesh->threadstarted = true;
-#if HAVE_CATTA
-
- if(mesh->discovery) {
- discovery_start(mesh);
- }
-
-#endif
-
- assert(mesh->self->ecdsa);
- assert(!memcmp((uint8_t *)mesh->self->ecdsa + 64, (uint8_t *)mesh->private_key + 64, 32));
-
-
pthread_mutex_unlock(&(mesh->mesh_mutex));
return true;
}
pthread_mutex_lock(&(mesh->mesh_mutex));
logger(mesh, MESHLINK_DEBUG, "meshlink_stop called\n");
-#if HAVE_CATTA
-
- // Stop discovery
- if(mesh->discovery) {
- discovery_stop(mesh);
- }
-
-#endif
-
// Shut down the main thread
event_loop_stop(&mesh->loop);
//Before doing meshlink_join make sure we are not connected to another mesh
if(mesh->threadstarted) {
- logger(mesh, MESHLINK_DEBUG, "Already connected to a mesh\n");
+ logger(mesh, MESHLINK_ERROR, "Cannot join while started\n");
+ meshlink_errno = MESHLINK_EINVAL;
+ pthread_mutex_unlock(&(mesh->mesh_mutex));
+ return false;
+ }
+
+ // Refuse to join a mesh if we are already part of one. We are part of one if we know at least one other node.
+ if(mesh->nodes->count > 1) {
+ logger(mesh, MESHLINK_ERROR, "Already part of an existing mesh\n");
meshlink_errno = MESHLINK_EINVAL;
pthread_mutex_unlock(&(mesh->mesh_mutex));
return false;
if(n->status.destroyed) {
meshlink_channel_close(mesh, channel);
- } else if(channel->receive_cb) {
- channel->receive_cb(mesh, channel, data, len);
+ return len;
+ }
+
+ const char *p = data;
+ size_t left = len;
+
+ while(channel->aio_receive) {
+ meshlink_aio_buffer_t *aio = channel->aio_receive;
+ size_t todo = aio->len - aio->done;
+
+ if(todo > left) {
+ todo = left;
+ }
+
+ memcpy((char *)aio->data + aio->done, p, todo);
+ aio->done += todo;
+
+ if(aio->done == aio->len) {
+ channel->aio_receive = aio->next;
+
+ if(aio->cb) {
+ aio->cb(mesh, channel, aio->data, aio->len, aio->priv);
+ }
+
+ free(aio);
+ }
+
+ p += todo;
+ left -= todo;
+
+ if(!left && len) {
+ return len;
+ }
+ }
+
+ if(channel->receive_cb) {
+ channel->receive_cb(mesh, channel, p, left);
}
return len;
node_t *n = channel->node;
meshlink_handle_t *mesh = n->mesh;
+ meshlink_aio_buffer_t *aio = channel->aio_send;
+
+ if(aio) {
+ /* We at least one AIO buffer. Send as much as possible form the first buffer. */
+ size_t left = aio->len - aio->done;
+
+ if(len > left) {
+ len = left;
+ }
+
+ ssize_t sent = utcp_send(connection, (char *)aio->data + aio->done, len);
+
+ if(sent >= 0) {
+ aio->done += sent;
+ }
+
+ /* If the buffer is now completely sent, call the callback and dispose of it. */
+ if(aio->done >= aio->len) {
+ channel->aio_send = aio->next;
+
+ if(aio->cb) {
+ aio->cb(mesh, channel, aio->data, aio->len, aio->priv);
+ }
- if(channel->poll_cb) {
- channel->poll_cb(mesh, channel, len);
+ free(aio);
+ }
+ } else {
+ if(channel->poll_cb) {
+ channel->poll_cb(mesh, channel, len);
+ } else {
+ utcp_set_poll_cb(connection, NULL);
+ }
}
}
void meshlink_set_channel_poll_cb(meshlink_handle_t *mesh, meshlink_channel_t *channel, meshlink_channel_poll_cb_t cb) {
(void)mesh;
channel->poll_cb = cb;
- utcp_set_poll_cb(channel->c, cb ? channel_poll : NULL);
+ utcp_set_poll_cb(channel->c, (cb || channel->aio_send) ? channel_poll : NULL);
}
void meshlink_set_channel_accept_cb(meshlink_handle_t *mesh, meshlink_channel_accept_cb_t cb) {
pthread_mutex_unlock(&mesh->mesh_mutex);
}
+void meshlink_set_channel_sndbuf(meshlink_handle_t *mesh, meshlink_channel_t *channel, size_t size) {
+ (void)mesh;
+
+ if(!channel) {
+ meshlink_errno = MESHLINK_EINVAL;
+ return;
+ }
+
+ pthread_mutex_lock(&mesh->mesh_mutex);
+ utcp_set_sndbuf(channel->c, size);
+ pthread_mutex_unlock(&mesh->mesh_mutex);
+}
+
+void meshlink_set_channel_rcvbuf(meshlink_handle_t *mesh, meshlink_channel_t *channel, size_t size) {
+ (void)mesh;
+
+ if(!channel) {
+ meshlink_errno = MESHLINK_EINVAL;
+ return;
+ }
+
+ pthread_mutex_lock(&mesh->mesh_mutex);
+ utcp_set_rcvbuf(channel->c, size);
+ pthread_mutex_unlock(&mesh->mesh_mutex);
+}
+
meshlink_channel_t *meshlink_channel_open_ex(meshlink_handle_t *mesh, meshlink_node_t *node, uint16_t port, meshlink_channel_receive_cb_t cb, const void *data, size_t len, uint32_t flags) {
if(data || len) {
abort(); // TODO: handle non-NULL data
}
utcp_close(channel->c);
+
+ /* Clean up any outstanding AIO buffers. */
+ for(meshlink_aio_buffer_t *aio = channel->aio_send, *next; aio; aio = next) {
+ next = aio->next;
+
+ if(aio->cb) {
+ aio->cb(mesh, channel, aio->data, aio->len, aio->priv);
+ }
+
+ free(aio);
+ }
+
+ for(meshlink_aio_buffer_t *aio = channel->aio_receive, *next; aio; aio = next) {
+ next = aio->next;
+
+ if(aio->cb) {
+ aio->cb(mesh, channel, aio->data, aio->len, aio->priv);
+ }
+
+ free(aio);
+ }
+
free(channel);
}
// Then, preferably only if there is room in the receiver window,
// kick the meshlink thread to go send packets.
+ ssize_t retval;
+
pthread_mutex_lock(&mesh->mesh_mutex);
- ssize_t retval = utcp_send(channel->c, data, len);
+
+ /* Disallow direct calls to utcp_send() while we still have AIO active. */
+ if(channel->aio_send) {
+ retval = 0;
+ } else {
+ retval = utcp_send(channel->c, data, len);
+ }
+
pthread_mutex_unlock(&mesh->mesh_mutex);
if(retval < 0) {
return retval;
}
+bool meshlink_channel_aio_send(meshlink_handle_t *mesh, meshlink_channel_t *channel, const void *data, size_t len, meshlink_aio_cb_t cb, void *priv) {
+ if(!mesh || !channel) {
+ meshlink_errno = MESHLINK_EINVAL;
+ return false;
+ }
+
+ if(!len || !data) {
+ meshlink_errno = MESHLINK_EINVAL;
+ return false;
+ }
+
+ meshlink_aio_buffer_t *aio = xzalloc(sizeof(*aio));
+ aio->data = data;
+ aio->len = len;
+ aio->cb = cb;
+ aio->priv = priv;
+
+ pthread_mutex_lock(&mesh->mesh_mutex);
+
+ /* Append the AIO buffer descriptor to the end of the chain */
+ meshlink_aio_buffer_t **p = &channel->aio_send;
+
+ while(*p) {
+ p = &(*p)->next;
+ }
+
+ *p = aio;
+
+ /* Ensure the poll callback is set, and call it right now to push data if possible */
+ utcp_set_poll_cb(channel->c, channel_poll);
+ channel_poll(channel->c, len);
+
+ pthread_mutex_unlock(&mesh->mesh_mutex);
+
+ return true;
+}
+
+bool meshlink_channel_aio_receive(meshlink_handle_t *mesh, meshlink_channel_t *channel, const void *data, size_t len, meshlink_aio_cb_t cb, void *priv) {
+ if(!mesh || !channel) {
+ meshlink_errno = MESHLINK_EINVAL;
+ return false;
+ }
+
+ if(!len || !data) {
+ meshlink_errno = MESHLINK_EINVAL;
+ return false;
+ }
+
+ meshlink_aio_buffer_t *aio = xzalloc(sizeof(*aio));
+ aio->data = data;
+ aio->len = len;
+ aio->cb = cb;
+ aio->priv = priv;
+
+ pthread_mutex_lock(&mesh->mesh_mutex);
+
+ /* Append the AIO buffer descriptor to the end of the chain */
+ meshlink_aio_buffer_t **p = &channel->aio_receive;
+
+ while(*p) {
+ p = &(*p)->next;
+ }
+
+ *p = aio;
+
+ pthread_mutex_unlock(&mesh->mesh_mutex);
+
+ return true;
+}
+
uint32_t meshlink_channel_get_flags(meshlink_handle_t *mesh, meshlink_channel_t *channel) {
if(!mesh || !channel) {
meshlink_errno = MESHLINK_EINVAL;
#endif
}
+void handle_network_change(meshlink_handle_t *mesh, bool online) {
+ (void)online;
+
+ if(!mesh->connections) {
+ return;
+ }
+
+ retry(mesh);
+}
+
static void __attribute__((constructor)) meshlink_init(void) {
crypto_init();
unsigned int seed;
}
/// Device class traits
-dev_class_traits_t dev_class_traits[_DEV_CLASS_MAX + 1] = {
+const dev_class_traits_t dev_class_traits[_DEV_CLASS_MAX + 1] = {
{ .min_connects = 3, .max_connects = 10000, .edge_weight = 1 }, // DEV_CLASS_BACKBONE
{ .min_connects = 3, .max_connects = 100, .edge_weight = 3 }, // DEV_CLASS_STATIONARY
{ .min_connects = 3, .max_connects = 3, .edge_weight = 6 }, // DEV_CLASS_PORTABLE