}
static bool write_main_config_files(meshlink_handle_t *mesh) {
+ if(!mesh->confbase) {
+ return true;
+ }
+
uint8_t buf[4096];
/* Write the main config file */
config_t config = {buf, packmsg_output_size(&out, buf)};
- if(!main_config_write(mesh, &config)) {
+ if(!main_config_write(mesh, "current", &config, mesh->config_key)) {
return false;
}
free(mesh->self->name);
mesh->name = name;
mesh->self->name = xstrdup(name);
- mesh->self->devclass = devclass;
+ mesh->self->devclass = devclass == DEV_CLASS_UNKNOWN ? mesh->devclass : devclass;
// Initialize configuration directory
- if(!config_init(mesh)) {
+ if(!config_init(mesh, "current")) {
return false;
}
node_add(mesh, n);
- if(!config_write(mesh, n->name, &config)) {
+ if(!config_write(mesh, "current", n->name, &config, mesh->config_key)) {
return false;
}
}
return true;
}
+
static bool sendline(int fd, char *format, ...) {
- static char buffer[4096];
+ char buffer[4096];
char *p = buffer;
int blen = 0;
va_list ap;
}
static bool meshlink_setup(meshlink_handle_t *mesh) {
- if(!config_init(mesh)) {
- logger(mesh, MESHLINK_ERROR, "Could not set up configuration in %s: %s\n", mesh->confbase, strerror(errno));
+ if(!config_init(mesh, "current")) {
+ logger(mesh, MESHLINK_ERROR, "Could not set up configuration in %s/current: %s\n", mesh->confbase, strerror(errno));
meshlink_errno = MESHLINK_ESTORAGE;
return false;
}
mesh->self->ecdsa = ecdsa_set_public_key(ecdsa_get_public_key(mesh->private_key));
if(!write_main_config_files(mesh)) {
+ logger(mesh, MESHLINK_ERROR, "Could not write main config files into %s/current: %s\n", mesh->confbase, strerror(errno));
+ meshlink_errno = MESHLINK_ESTORAGE;
return false;
}
config_t config;
- if(!main_config_read(mesh, &config)) {
+ if(!main_config_read(mesh, "current", &config, mesh->config_key)) {
logger(NULL, MESHLINK_ERROR, "Could not read main configuration file!");
return false;
}
}
bool meshlink_encrypted_key_rotate(meshlink_handle_t *mesh, const void *new_key, size_t new_keylen) {
+ if(!mesh || !new_key || !new_keylen) {
+ logger(mesh, MESHLINK_ERROR, "Invalid arguments given!\n");
+ meshlink_errno = MESHLINK_EINVAL;
+ return false;
+ }
+
+ pthread_mutex_lock(&(mesh->mesh_mutex));
+
+ // Create hash for the new key
+ void *new_config_key;
+ new_config_key = xmalloc(CHACHA_POLY1305_KEYLEN);
+
+ if(!prf(new_key, new_keylen, "MeshLink configuration key", 26, new_config_key, CHACHA_POLY1305_KEYLEN)) {
+ logger(mesh, MESHLINK_ERROR, "Error creating new configuration key!\n");
+ meshlink_errno = MESHLINK_EINTERNAL;
+ pthread_mutex_unlock(&(mesh->mesh_mutex));
+ return false;
+ }
+
+ // Copy contents of the "current" confbase sub-directory to "new" confbase sub-directory with the new key
+
+ if(!config_copy(mesh, "current", mesh->config_key, "new", new_config_key)) {
+ logger(mesh, MESHLINK_ERROR, "Could not set up configuration in %s/old: %s\n", mesh->confbase, strerror(errno));
+ meshlink_errno = MESHLINK_ESTORAGE;
+ pthread_mutex_unlock(&(mesh->mesh_mutex));
+ return false;
+ }
- // While copying old config files to new config files
devtool_keyrotate_probe(1);
- // After completed creating new config files in confbase/new/
+
+ main_config_unlock(mesh);
+
+ // Rename confbase/current/ to confbase/old
+
+ if(!config_rename(mesh, "current", "old")) {
+ logger(mesh, MESHLINK_ERROR, "Cannot rename %s/current to %s/old\n", mesh->confbase, mesh->confbase);
+ meshlink_errno = MESHLINK_ESTORAGE;
+ main_config_lock(mesh);
+ pthread_mutex_unlock(&(mesh->mesh_mutex));
+ return false;
+ }
+
devtool_keyrotate_probe(2);
- // Rename confbase/current to confbase/old/
- devtool_keyrotate_probe(3);
+
// Rename confbase/new/ to confbase/current
- devtool_keyrotate_probe(4);
- // Before deleting old sub-directory
- devtool_keyrotate_probe(5);
- return false;
+ if(!config_rename(mesh, "new", "current")) {
+ logger(mesh, MESHLINK_ERROR, "Cannot rename %s/new to %s/current\n", mesh->confbase, mesh->confbase);
+ meshlink_errno = MESHLINK_ESTORAGE;
+ main_config_lock(mesh);
+ pthread_mutex_unlock(&(mesh->mesh_mutex));
+ return false;
+ }
+
+ devtool_keyrotate_probe(3);
+
+ if(!main_config_lock(mesh)) {
+ pthread_mutex_unlock(&(mesh->mesh_mutex));
+ return false;
+ }
+
+ // Cleanup the "old" confbase sub-directory
+
+ if(!config_destroy(mesh->confbase, "old")) {
+ pthread_mutex_unlock(&(mesh->mesh_mutex));
+ return false;
+ }
+
+ // Change the mesh handle key with new key
+
+ free(mesh->config_key);
+ mesh->config_key = new_config_key;
+
+ pthread_mutex_unlock(&(mesh->mesh_mutex));
+
+ return true;
}
void meshlink_open_params_free(meshlink_open_params_t *params) {
// If no configuration exists yet, create it.
- if(!main_config_exists(mesh)) {
+ if(!meshlink_confbase_exists(mesh)) {
if(!meshlink_setup(mesh)) {
logger(NULL, MESHLINK_ERROR, "Cannot create initial configuration\n");
meshlink_close(mesh);
#ifdef HAVE_SETNS
if(setns(mesh->netns, CLONE_NEWNET) != 0) {
+ pthread_cond_signal(&mesh->cond);
return NULL;
}
#else
+ pthread_cond_signal(&mesh->cond);
return NULL;
#endif // HAVE_SETNS
}
+#if HAVE_CATTA
+
+ if(mesh->discovery) {
+ discovery_start(mesh);
+ }
+
+#endif
+
pthread_mutex_lock(&(mesh->mesh_mutex));
logger(mesh, MESHLINK_DEBUG, "Starting main_loop...\n");
+ pthread_cond_broadcast(&mesh->cond);
main_loop(mesh);
logger(mesh, MESHLINK_DEBUG, "main_loop returned.\n");
pthread_mutex_unlock(&(mesh->mesh_mutex));
+
+#if HAVE_CATTA
+
+ // Stop discovery
+ if(mesh->discovery) {
+ discovery_stop(mesh);
+ }
+
+#endif
+
return NULL;
}
pthread_mutex_lock(&(mesh->mesh_mutex));
+ assert(mesh->self->ecdsa);
+ assert(!memcmp((uint8_t *)mesh->self->ecdsa + 64, (uint8_t *)mesh->private_key + 64, 32));
+
if(mesh->threadstarted) {
logger(mesh, MESHLINK_DEBUG, "thread was already running\n");
pthread_mutex_unlock(&(mesh->mesh_mutex));
return false;
}
+ pthread_cond_wait(&mesh->cond, &mesh->mesh_mutex);
mesh->threadstarted = true;
-#if HAVE_CATTA
-
- if(mesh->discovery) {
- discovery_start(mesh);
- }
-
-#endif
-
- assert(mesh->self->ecdsa);
- assert(!memcmp((uint8_t *)mesh->self->ecdsa + 64, (uint8_t *)mesh->private_key + 64, 32));
-
-
pthread_mutex_unlock(&(mesh->mesh_mutex));
return true;
}
pthread_mutex_lock(&(mesh->mesh_mutex));
logger(mesh, MESHLINK_DEBUG, "meshlink_stop called\n");
-#if HAVE_CATTA
-
- // Stop discovery
- if(mesh->discovery) {
- discovery_stop(mesh);
- }
-
-#endif
-
// Shut down the main thread
event_loop_stop(&mesh->loop);
return false;
}
- return config_destroy(confbase);
+ if(!config_destroy(confbase, "current")) {
+ logger(NULL, MESHLINK_ERROR, "Cannot remove confbase sub-directories %s: %s\n", confbase, strerror(errno));
+ return false;
+ }
+
+ config_destroy(confbase, "new");
+ config_destroy(confbase, "old");
+
+ if(rmdir(confbase) && errno != ENOENT) {
+ logger(NULL, MESHLINK_ERROR, "Cannot remove directory %s: %s\n", confbase, strerror(errno));
+ meshlink_errno = MESHLINK_ESTORAGE;
+ return false;
+ }
+
+ return true;
}
void meshlink_set_receive_cb(meshlink_handle_t *mesh, meshlink_receive_cb_t cb) {
static bool search_node_by_dev_class(const node_t *node, const void *condition) {
dev_class_t *devclass = (dev_class_t *)condition;
- if(*devclass == node->devclass) {
+ if(*devclass == (dev_class_t)node->devclass) {
return true;
}
mesh->self = new_node();
mesh->self->name = xstrdup(mesh->name);
mesh->self->devclass = mesh->devclass;
+ xasprintf(&mesh->myport, "%d", port);
if(!node_read_public_key(mesh, mesh->self)) {
logger(NULL, MESHLINK_ERROR, "Could not read our host configuration file!");
}
// Ensure no host configuration file with that name exists
- if(config_exists(mesh, name)) {
+ if(config_exists(mesh, "current", name)) {
logger(mesh, MESHLINK_DEBUG, "A host config file for %s already exists!\n", name);
meshlink_errno = MESHLINK_EEXIST;
pthread_mutex_unlock(&(mesh->mesh_mutex));
config_t configs[5] = {NULL};
int count = 0;
- if(config_read(mesh, mesh->self->name, &configs[count])) {
+ if(config_read(mesh, "current", mesh->self->name, &configs[count], mesh->config_key)) {
count++;
}
config_t config = {outbuf, packmsg_output_size(&inv, outbuf)};
- if(!invitation_write(mesh, cookiehash, &config)) {
+ if(!invitation_write(mesh, "current", cookiehash, &config, mesh->config_key)) {
logger(mesh, MESHLINK_DEBUG, "Could not create invitation file %s: %s\n", cookiehash, strerror(errno));
meshlink_errno = MESHLINK_ESTORAGE;
pthread_mutex_unlock(&(mesh->mesh_mutex));
//Before doing meshlink_join make sure we are not connected to another mesh
if(mesh->threadstarted) {
- logger(mesh, MESHLINK_DEBUG, "Already connected to a mesh\n");
+ logger(mesh, MESHLINK_ERROR, "Cannot join while started\n");
+ meshlink_errno = MESHLINK_EINVAL;
+ pthread_mutex_unlock(&(mesh->mesh_mutex));
+ return false;
+ }
+
+ // Refuse to join a mesh if we are already part of one. We are part of one if we know at least one other node.
+ if(mesh->nodes->count > 1) {
+ logger(mesh, MESHLINK_ERROR, "Already part of an existing mesh\n");
meshlink_errno = MESHLINK_EINVAL;
pthread_mutex_unlock(&(mesh->mesh_mutex));
return false;
break;
}
- config_write(mesh, n->name, &config);
+ config_write(mesh, "current", n->name, &config, mesh->config_key);
node_add(mesh, n);
}
if(n->status.destroyed) {
meshlink_channel_close(mesh, channel);
- } else if(channel->receive_cb) {
- channel->receive_cb(mesh, channel, data, len);
+ return len;
+ }
+
+ const char *p = data;
+ size_t left = len;
+
+ while(channel->aio_receive) {
+ meshlink_aio_buffer_t *aio = channel->aio_receive;
+ size_t todo = aio->len - aio->done;
+
+ if(todo > left) {
+ todo = left;
+ }
+
+ memcpy((char *)aio->data + aio->done, p, todo);
+ aio->done += todo;
+
+ if(aio->done == aio->len) {
+ channel->aio_receive = aio->next;
+
+ if(aio->cb) {
+ aio->cb(mesh, channel, aio->data, aio->len, aio->priv);
+ }
+
+ free(aio);
+ }
+
+ p += todo;
+ left -= todo;
+
+ if(!left && len) {
+ return len;
+ }
+ }
+
+ if(channel->receive_cb) {
+ channel->receive_cb(mesh, channel, p, left);
}
return len;
node_t *n = channel->node;
meshlink_handle_t *mesh = n->mesh;
+ meshlink_aio_buffer_t *aio = channel->aio_send;
- if(channel->poll_cb) {
- channel->poll_cb(mesh, channel, len);
+ if(aio) {
+ /* We at least one AIO buffer. Send as much as possible form the first buffer. */
+ size_t left = aio->len - aio->done;
+
+ if(len > left) {
+ len = left;
+ }
+
+ ssize_t sent = utcp_send(connection, (char *)aio->data + aio->done, len);
+
+ if(sent >= 0) {
+ aio->done += sent;
+ }
+
+ /* If the buffer is now completely sent, call the callback and dispose of it. */
+ if(aio->done >= aio->len) {
+ channel->aio_send = aio->next;
+
+ if(aio->cb) {
+ aio->cb(mesh, channel, aio->data, aio->len, aio->priv);
+ }
+
+ free(aio);
+ }
+ } else {
+ if(channel->poll_cb) {
+ channel->poll_cb(mesh, channel, len);
+ } else {
+ utcp_set_poll_cb(connection, NULL);
+ }
}
}
void meshlink_set_channel_poll_cb(meshlink_handle_t *mesh, meshlink_channel_t *channel, meshlink_channel_poll_cb_t cb) {
(void)mesh;
channel->poll_cb = cb;
- utcp_set_poll_cb(channel->c, cb ? channel_poll : NULL);
+ utcp_set_poll_cb(channel->c, (cb || channel->aio_send) ? channel_poll : NULL);
}
void meshlink_set_channel_accept_cb(meshlink_handle_t *mesh, meshlink_channel_accept_cb_t cb) {
pthread_mutex_unlock(&mesh->mesh_mutex);
}
+void meshlink_set_channel_sndbuf(meshlink_handle_t *mesh, meshlink_channel_t *channel, size_t size) {
+ (void)mesh;
+
+ if(!channel) {
+ meshlink_errno = MESHLINK_EINVAL;
+ return;
+ }
+
+ pthread_mutex_lock(&mesh->mesh_mutex);
+ utcp_set_sndbuf(channel->c, size);
+ pthread_mutex_unlock(&mesh->mesh_mutex);
+}
+
+void meshlink_set_channel_rcvbuf(meshlink_handle_t *mesh, meshlink_channel_t *channel, size_t size) {
+ (void)mesh;
+
+ if(!channel) {
+ meshlink_errno = MESHLINK_EINVAL;
+ return;
+ }
+
+ pthread_mutex_lock(&mesh->mesh_mutex);
+ utcp_set_rcvbuf(channel->c, size);
+ pthread_mutex_unlock(&mesh->mesh_mutex);
+}
+
meshlink_channel_t *meshlink_channel_open_ex(meshlink_handle_t *mesh, meshlink_node_t *node, uint16_t port, meshlink_channel_receive_cb_t cb, const void *data, size_t len, uint32_t flags) {
if(data || len) {
abort(); // TODO: handle non-NULL data
}
utcp_close(channel->c);
+
+ /* Clean up any outstanding AIO buffers. */
+ for(meshlink_aio_buffer_t *aio = channel->aio_send, *next; aio; aio = next) {
+ next = aio->next;
+
+ if(aio->cb) {
+ aio->cb(mesh, channel, aio->data, aio->len, aio->priv);
+ }
+
+ free(aio);
+ }
+
+ for(meshlink_aio_buffer_t *aio = channel->aio_receive, *next; aio; aio = next) {
+ next = aio->next;
+
+ if(aio->cb) {
+ aio->cb(mesh, channel, aio->data, aio->len, aio->priv);
+ }
+
+ free(aio);
+ }
+
free(channel);
}
// Then, preferably only if there is room in the receiver window,
// kick the meshlink thread to go send packets.
+ ssize_t retval;
+
pthread_mutex_lock(&mesh->mesh_mutex);
- ssize_t retval = utcp_send(channel->c, data, len);
+
+ /* Disallow direct calls to utcp_send() while we still have AIO active. */
+ if(channel->aio_send) {
+ retval = 0;
+ } else {
+ retval = utcp_send(channel->c, data, len);
+ }
+
pthread_mutex_unlock(&mesh->mesh_mutex);
if(retval < 0) {
return retval;
}
+bool meshlink_channel_aio_send(meshlink_handle_t *mesh, meshlink_channel_t *channel, const void *data, size_t len, meshlink_aio_cb_t cb, void *priv) {
+ if(!mesh || !channel) {
+ meshlink_errno = MESHLINK_EINVAL;
+ return false;
+ }
+
+ if(!len || !data) {
+ meshlink_errno = MESHLINK_EINVAL;
+ return false;
+ }
+
+ meshlink_aio_buffer_t *aio = xzalloc(sizeof(*aio));
+ aio->data = data;
+ aio->len = len;
+ aio->cb = cb;
+ aio->priv = priv;
+
+ pthread_mutex_lock(&mesh->mesh_mutex);
+
+ /* Append the AIO buffer descriptor to the end of the chain */
+ meshlink_aio_buffer_t **p = &channel->aio_send;
+
+ while(*p) {
+ p = &(*p)->next;
+ }
+
+ *p = aio;
+
+ /* Ensure the poll callback is set, and call it right now to push data if possible */
+ utcp_set_poll_cb(channel->c, channel_poll);
+ channel_poll(channel->c, len);
+
+ pthread_mutex_unlock(&mesh->mesh_mutex);
+
+ return true;
+}
+
+bool meshlink_channel_aio_receive(meshlink_handle_t *mesh, meshlink_channel_t *channel, const void *data, size_t len, meshlink_aio_cb_t cb, void *priv) {
+ if(!mesh || !channel) {
+ meshlink_errno = MESHLINK_EINVAL;
+ return false;
+ }
+
+ if(!len || !data) {
+ meshlink_errno = MESHLINK_EINVAL;
+ return false;
+ }
+
+ meshlink_aio_buffer_t *aio = xzalloc(sizeof(*aio));
+ aio->data = data;
+ aio->len = len;
+ aio->cb = cb;
+ aio->priv = priv;
+
+ pthread_mutex_lock(&mesh->mesh_mutex);
+
+ /* Append the AIO buffer descriptor to the end of the chain */
+ meshlink_aio_buffer_t **p = &channel->aio_receive;
+
+ while(*p) {
+ p = &(*p)->next;
+ }
+
+ *p = aio;
+
+ pthread_mutex_unlock(&mesh->mesh_mutex);
+
+ return true;
+}
+
uint32_t meshlink_channel_get_flags(meshlink_handle_t *mesh, meshlink_channel_t *channel) {
if(!mesh || !channel) {
meshlink_errno = MESHLINK_EINVAL;
#endif
}
+void handle_network_change(meshlink_handle_t *mesh, bool online) {
+ (void)online;
+
+ if(!mesh->connections) {
+ return;
+ }
+
+ retry(mesh);
+}
+
static void __attribute__((constructor)) meshlink_init(void) {
crypto_init();
unsigned int seed;
}
/// Device class traits
-dev_class_traits_t dev_class_traits[_DEV_CLASS_MAX + 1] = {
+const dev_class_traits_t dev_class_traits[_DEV_CLASS_MAX + 1] = {
{ .min_connects = 3, .max_connects = 10000, .edge_weight = 1 }, // DEV_CLASS_BACKBONE
{ .min_connects = 3, .max_connects = 100, .edge_weight = 3 }, // DEV_CLASS_STATIONARY
{ .min_connects = 3, .max_connects = 3, .edge_weight = 6 }, // DEV_CLASS_PORTABLE