char *name = packmsg_get_str_dup(&in);
packmsg_skip_element(&in); /* submesh */
- int32_t devclass = packmsg_get_int32(&in);
+ dev_class_t devclass = packmsg_get_int32(&in);
uint32_t count = packmsg_get_array(&in);
if(!name) {
}
}
- if((int)devclass < 0 || devclass > _DEV_CLASS_MAX) {
+ if(devclass < 0 || devclass >= DEV_CLASS_COUNT) {
logger(NULL, MESHLINK_ERROR, "Invalid devclass given!\n");
meshlink_errno = MESHLINK_EINVAL;
return NULL;
free(params);
}
+/// Device class traits
+static const dev_class_traits_t default_class_traits[DEV_CLASS_COUNT] = {
+ { .pingtimeout = 5, .pinginterval = 60, .min_connects = 3, .max_connects = 10000, .edge_weight = 1 }, // DEV_CLASS_BACKBONE
+ { .pingtimeout = 5, .pinginterval = 60, .min_connects = 3, .max_connects = 100, .edge_weight = 3 }, // DEV_CLASS_STATIONARY
+ { .pingtimeout = 5, .pinginterval = 60, .min_connects = 3, .max_connects = 3, .edge_weight = 6 }, // DEV_CLASS_PORTABLE
+ { .pingtimeout = 5, .pinginterval = 60, .min_connects = 1, .max_connects = 1, .edge_weight = 9 }, // DEV_CLASS_UNKNOWN
+};
+
meshlink_handle_t *meshlink_open(const char *confbase, const char *name, const char *appname, dev_class_t devclass) {
if(!confbase || !*confbase) {
logger(NULL, MESHLINK_ERROR, "No confbase given!\n");
}
}
- if((int)params->devclass < 0 || params->devclass > _DEV_CLASS_MAX) {
+ if(params->devclass < 0 || params->devclass >= DEV_CLASS_COUNT) {
logger(NULL, MESHLINK_ERROR, "Invalid devclass given!\n");
meshlink_errno = MESHLINK_EINVAL;
return NULL;
mesh->netns = params->netns;
mesh->submeshes = NULL;
+ memcpy(mesh->dev_class_traits, default_class_traits, sizeof(default_class_traits));
+
if(usingname) {
mesh->name = xstrdup(params->name);
}
}
meshlink_node_t **meshlink_get_all_nodes_by_dev_class(meshlink_handle_t *mesh, dev_class_t devclass, meshlink_node_t **nodes, size_t *nmemb) {
- if(!mesh || ((int)devclass < 0) || (devclass > _DEV_CLASS_MAX) || !nmemb) {
+ if(!mesh || devclass < 0 || devclass >= DEV_CLASS_COUNT || !nmemb) {
meshlink_errno = MESHLINK_EINVAL;
return NULL;
}
return mesh->channel_accept_cb;
}
+static void aio_signal(meshlink_handle_t *mesh, meshlink_channel_t *channel, meshlink_aio_buffer_t *aio) {
+ if(aio->data) {
+ if(aio->cb.buffer) {
+ aio->cb.buffer(mesh, channel, aio->data, aio->len, aio->priv);
+ }
+ } else {
+ if(aio->cb.fd) {
+ aio->cb.fd(mesh, channel, aio->fd, aio->done, aio->priv);
+ }
+ }
+}
+
static ssize_t channel_recv(struct utcp_connection *connection, const void *data, size_t len) {
meshlink_channel_t *channel = connection->priv;
if(n->status.destroyed) {
meshlink_channel_close(mesh, channel);
- } else if(channel->receive_cb) {
- channel->receive_cb(mesh, channel, data, len);
+ return len;
+ }
+
+ const char *p = data;
+ size_t left = len;
+
+ while(channel->aio_receive) {
+ meshlink_aio_buffer_t *aio = channel->aio_receive;
+ size_t todo = aio->len - aio->done;
+
+ if(todo > left) {
+ todo = left;
+ }
+
+ if(aio->data) {
+ memcpy((char *)aio->data + aio->done, p, todo);
+ } else {
+ ssize_t result = write(aio->fd, p, todo);
+
+ if(result > 0) {
+ todo = result;
+ }
+ }
+
+ aio->done += todo;
+
+ if(aio->done == aio->len) {
+ channel->aio_receive = aio->next;
+ aio_signal(mesh, channel, aio);
+ free(aio);
+ }
+
+ p += todo;
+ left -= todo;
+
+ if(!left && len) {
+ return len;
+ }
+ }
+
+ if(channel->receive_cb) {
+ channel->receive_cb(mesh, channel, p, left);
}
return len;
node_t *n = channel->node;
meshlink_handle_t *mesh = n->mesh;
- meshlink_aio_buffer_t *aio = channel->aio;
+ meshlink_aio_buffer_t *aio = channel->aio_send;
if(aio) {
/* We at least one AIO buffer. Send as much as possible form the first buffer. */
size_t left = aio->len - aio->done;
+ ssize_t sent;
if(len > left) {
len = left;
}
- ssize_t sent = utcp_send(connection, (char *)aio->data + aio->done, len);
+ if(aio->data) {
+ sent = utcp_send(connection, (char *)aio->data + aio->done, len);
+ } else {
+ char buf[65536];
+ size_t todo = utcp_get_sndbuf_free(connection);
+
+ if(todo > left) {
+ todo = left;
+ }
+
+ if(todo > sizeof(buf)) {
+ todo = sizeof(buf);
+ }
+
+ ssize_t result = read(aio->fd, buf, todo);
+
+ if(result > 0) {
+ sent = utcp_send(connection, buf, result);
+ } else {
+ sent = result;
+ }
+ }
if(sent >= 0) {
aio->done += sent;
/* If the buffer is now completely sent, call the callback and dispose of it. */
if(aio->done >= aio->len) {
- channel->aio = aio->next;
-
- if(aio->cb) {
- aio->cb(mesh, channel, aio->data, aio->len, aio->priv);
- }
-
+ channel->aio_send = aio->next;
+ aio_signal(mesh, channel, aio);
free(aio);
}
} else {
void meshlink_set_channel_poll_cb(meshlink_handle_t *mesh, meshlink_channel_t *channel, meshlink_channel_poll_cb_t cb) {
(void)mesh;
channel->poll_cb = cb;
- utcp_set_poll_cb(channel->c, (cb || channel->aio) ? channel_poll : NULL);
+ utcp_set_poll_cb(channel->c, (cb || channel->aio_send) ? channel_poll : NULL);
}
void meshlink_set_channel_accept_cb(meshlink_handle_t *mesh, meshlink_channel_accept_cb_t cb) {
pthread_mutex_unlock(&mesh->mesh_mutex);
}
+void meshlink_set_channel_sndbuf(meshlink_handle_t *mesh, meshlink_channel_t *channel, size_t size) {
+ (void)mesh;
+
+ if(!channel) {
+ meshlink_errno = MESHLINK_EINVAL;
+ return;
+ }
+
+ pthread_mutex_lock(&mesh->mesh_mutex);
+ utcp_set_sndbuf(channel->c, size);
+ pthread_mutex_unlock(&mesh->mesh_mutex);
+}
+
+void meshlink_set_channel_rcvbuf(meshlink_handle_t *mesh, meshlink_channel_t *channel, size_t size) {
+ (void)mesh;
+
+ if(!channel) {
+ meshlink_errno = MESHLINK_EINVAL;
+ return;
+ }
+
+ pthread_mutex_lock(&mesh->mesh_mutex);
+ utcp_set_rcvbuf(channel->c, size);
+ pthread_mutex_unlock(&mesh->mesh_mutex);
+}
+
meshlink_channel_t *meshlink_channel_open_ex(meshlink_handle_t *mesh, meshlink_node_t *node, uint16_t port, meshlink_channel_receive_cb_t cb, const void *data, size_t len, uint32_t flags) {
if(data || len) {
abort(); // TODO: handle non-NULL data
utcp_close(channel->c);
/* Clean up any outstanding AIO buffers. */
- for(meshlink_aio_buffer_t *aio = channel->aio, *next; aio; aio = next) {
+ for(meshlink_aio_buffer_t *aio = channel->aio_send, *next; aio; aio = next) {
next = aio->next;
+ aio_signal(mesh, channel, aio);
+ free(aio);
+ }
- if(aio->cb) {
- aio->cb(mesh, channel, aio->data, aio->len, aio->priv);
- }
-
+ for(meshlink_aio_buffer_t *aio = channel->aio_receive, *next; aio; aio = next) {
+ next = aio->next;
+ aio_signal(mesh, channel, aio);
free(aio);
}
pthread_mutex_lock(&mesh->mesh_mutex);
/* Disallow direct calls to utcp_send() while we still have AIO active. */
- if(channel->aio) {
+ if(channel->aio_send) {
retval = 0;
} else {
retval = utcp_send(channel->c, data, len);
meshlink_aio_buffer_t *aio = xzalloc(sizeof(*aio));
aio->data = data;
aio->len = len;
- aio->cb = cb;
+ aio->cb.buffer = cb;
aio->priv = priv;
pthread_mutex_lock(&mesh->mesh_mutex);
/* Append the AIO buffer descriptor to the end of the chain */
- meshlink_aio_buffer_t **p = &channel->aio;
+ meshlink_aio_buffer_t **p = &channel->aio_send;
while(*p) {
p = &(*p)->next;
return true;
}
+bool meshlink_channel_aio_fd_send(meshlink_handle_t *mesh, meshlink_channel_t *channel, int fd, size_t len, meshlink_aio_fd_cb_t cb, void *priv) {
+ if(!mesh || !channel) {
+ meshlink_errno = MESHLINK_EINVAL;
+ return false;
+ }
+
+ if(!len || fd == -1) {
+ meshlink_errno = MESHLINK_EINVAL;
+ return false;
+ }
+
+ meshlink_aio_buffer_t *aio = xzalloc(sizeof(*aio));
+ aio->fd = fd;
+ aio->len = len;
+ aio->cb.fd = cb;
+ aio->priv = priv;
+
+ pthread_mutex_lock(&mesh->mesh_mutex);
+
+ /* Append the AIO buffer descriptor to the end of the chain */
+ meshlink_aio_buffer_t **p = &channel->aio_send;
+
+ while(*p) {
+ p = &(*p)->next;
+ }
+
+ *p = aio;
+
+ /* Ensure the poll callback is set, and call it right now to push data if possible */
+ utcp_set_poll_cb(channel->c, channel_poll);
+ channel_poll(channel->c, len);
+
+ pthread_mutex_unlock(&mesh->mesh_mutex);
+
+ return true;
+}
+
+bool meshlink_channel_aio_receive(meshlink_handle_t *mesh, meshlink_channel_t *channel, const void *data, size_t len, meshlink_aio_cb_t cb, void *priv) {
+ if(!mesh || !channel) {
+ meshlink_errno = MESHLINK_EINVAL;
+ return false;
+ }
+
+ if(!len || !data) {
+ meshlink_errno = MESHLINK_EINVAL;
+ return false;
+ }
+
+ meshlink_aio_buffer_t *aio = xzalloc(sizeof(*aio));
+ aio->data = data;
+ aio->len = len;
+ aio->cb.buffer = cb;
+ aio->priv = priv;
+
+ pthread_mutex_lock(&mesh->mesh_mutex);
+
+ /* Append the AIO buffer descriptor to the end of the chain */
+ meshlink_aio_buffer_t **p = &channel->aio_receive;
+
+ while(*p) {
+ p = &(*p)->next;
+ }
+
+ *p = aio;
+
+ pthread_mutex_unlock(&mesh->mesh_mutex);
+
+ return true;
+}
+
+bool meshlink_channel_aio_fd_receive(meshlink_handle_t *mesh, meshlink_channel_t *channel, int fd, size_t len, meshlink_aio_fd_cb_t cb, void *priv) {
+ if(!mesh || !channel) {
+ meshlink_errno = MESHLINK_EINVAL;
+ return false;
+ }
+
+ if(!len || fd == -1) {
+ meshlink_errno = MESHLINK_EINVAL;
+ return false;
+ }
+
+ meshlink_aio_buffer_t *aio = xzalloc(sizeof(*aio));
+ aio->fd = fd;
+ aio->len = len;
+ aio->cb.fd = cb;
+ aio->priv = priv;
+
+ pthread_mutex_lock(&mesh->mesh_mutex);
+
+ /* Append the AIO buffer descriptor to the end of the chain */
+ meshlink_aio_buffer_t **p = &channel->aio_receive;
+
+ while(*p) {
+ p = &(*p)->next;
+ }
+
+ *p = aio;
+
+ pthread_mutex_unlock(&mesh->mesh_mutex);
+
+ return true;
+}
+
uint32_t meshlink_channel_get_flags(meshlink_handle_t *mesh, meshlink_channel_t *channel) {
if(!mesh || !channel) {
meshlink_errno = MESHLINK_EINVAL;
#endif
}
+void meshlink_set_dev_class_timeouts(meshlink_handle_t *mesh, dev_class_t devclass, int pinginterval, int pingtimeout) {
+ if(!mesh || devclass < 0 || devclass >= DEV_CLASS_COUNT) {
+ meshlink_errno = EINVAL;
+ return;
+ }
+
+ if(pinginterval < 1 || pingtimeout < 1 || pingtimeout > pinginterval) {
+ meshlink_errno = EINVAL;
+ return;
+ }
+
+ pthread_mutex_lock(&mesh->mesh_mutex);
+ mesh->dev_class_traits[devclass].pinginterval = pinginterval;
+ mesh->dev_class_traits[devclass].pingtimeout = pingtimeout;
+ pthread_mutex_unlock(&mesh->mesh_mutex);
+}
+
void handle_network_change(meshlink_handle_t *mesh, bool online) {
(void)online;
static void __attribute__((destructor)) meshlink_exit(void) {
crypto_exit();
}
-
-/// Device class traits
-const dev_class_traits_t dev_class_traits[_DEV_CLASS_MAX + 1] = {
- { .min_connects = 3, .max_connects = 10000, .edge_weight = 1 }, // DEV_CLASS_BACKBONE
- { .min_connects = 3, .max_connects = 100, .edge_weight = 3 }, // DEV_CLASS_STATIONARY
- { .min_connects = 3, .max_connects = 3, .edge_weight = 6 }, // DEV_CLASS_PORTABLE
- { .min_connects = 1, .max_connects = 1, .edge_weight = 9 }, // DEV_CLASS_UNKNOWN
-};