event_loop_start(&mesh->loop);
- if(pthread_create(&mesh->thread, NULL, meshlink_main_loop, mesh) != 0) {
+ // Ensure we have a decent amount of stack space. Musl's default of 80 kB is too small.
+ pthread_attr_t attr;
+ pthread_attr_init(&attr);
+ pthread_attr_setstacksize(&attr, 1024 * 1024);
+
+ if(pthread_create(&mesh->thread, &attr, meshlink_main_loop, mesh) != 0) {
logger(mesh, MESHLINK_DEBUG, "Could not start thread: %s\n", strerror(errno));
memset(&mesh->thread, 0, sizeof(mesh)->thread);
meshlink_errno = MESHLINK_EINTERNAL;
if(aio->data) {
if(aio->cb.buffer) {
- aio->cb.buffer(mesh, channel, aio->data, aio->len, aio->priv);
+ aio->cb.buffer(mesh, channel, aio->data, aio->done, aio->priv);
}
} else {
if(aio->cb.fd) {
if(aio->data) {
sent = utcp_send(connection, (char *)aio->data + aio->done, todo);
} else {
+ /* Limit the amount we read at once to avoid stack overflows */
+ if(todo > 65536) {
+ todo = 65536;
+ }
+
char buf[todo];
ssize_t result = read(aio->fd, buf, todo);
if(sent != (ssize_t)todo) {
/* We should never get a partial send at this point */
- assert(sent < 0);
+ assert(sent <= 0);
/* Sending failed, abort all outstanding AIO buffers and send a poll callback. */
if(!aio_abort(mesh, channel, &channel->aio_send)) {
return -1;
}
- if(!len) {
- return 0;
- }
-
- if(!data) {
+ if(len && !data) {
meshlink_errno = MESHLINK_EINVAL;
return -1;
}
meshlink_errno = MESHLINK_ENETWORK;
}
+ if(utcp_get_flush_needed(channel->c)) {
+ signal_trigger(&mesh->loop, &mesh->datafromapp);
+ }
+
return retval;
}
pthread_mutex_unlock(&mesh->mutex);
}
+void meshlink_set_node_flush_timeout(meshlink_handle_t *mesh, meshlink_node_t *node, int timeout) {
+ if(!mesh || !node) {
+ meshlink_errno = MESHLINK_EINVAL;
+ return;
+ }
+
+ node_t *n = (node_t *)node;
+
+ pthread_mutex_lock(&mesh->mutex);
+
+ if(!n->utcp) {
+ n->utcp = utcp_init(channel_accept, channel_pre_accept, channel_send, n);
+ utcp_set_mtu(n->utcp, n->mtu - sizeof(meshlink_packethdr_t));
+ utcp_set_retransmit_cb(n->utcp, channel_retransmit);
+ }
+
+ utcp_set_flush_timeout(n->utcp, timeout);
+
+ pthread_mutex_unlock(&mesh->mutex);
+}
+
void update_node_status(meshlink_handle_t *mesh, node_t *n) {
if(n->status.reachable && mesh->channel_accept_cb && !n->utcp) {
n->utcp = utcp_init(channel_accept, channel_pre_accept, channel_send, n);