From 947f09ff2c507a80bbe7f92ed0d41b06c98d5375 Mon Sep 17 00:00:00 2001 From: Guus Sliepen Date: Sun, 13 Oct 2019 23:32:03 +0200 Subject: [PATCH] Allow the mesh to detect when a node has completely restarted. When calling meshlink_open(), a node creates a unique ID that is passed along ADD_EDGE messages. When a node becomes unreachable and then reachable again, this allows other nodes to detect whether it was just a temporary network issue, or whether the node completely restarted (either because meshlink_close() was called or because it crashed). At the moment, when this is detected, other nodes close all open channels with the restarted node. --- src/edge.h | 1 + src/graph.c | 9 +++++++++ src/meshlink.c | 7 +++++++ src/meshlink_internal.h | 1 + src/node.h | 3 ++- src/protocol_edge.c | 24 ++++++++++++++++-------- test/channels-failure.c | 31 +++++++++++++++++++++++++++++-- 7 files changed, 65 insertions(+), 11 deletions(-) diff --git a/src/edge.h b/src/edge.h index 2e59fa1d..ba18e875 100644 --- a/src/edge.h +++ b/src/edge.h @@ -34,6 +34,7 @@ typedef struct edge_t { struct edge_t *reverse; /* edge in the opposite direction, if available */ int weight; /* weight of this edge */ + uint32_t session_id; /* the session_id of the from node */ } edge_t; extern void init_edges(struct meshlink_handle *mesh); diff --git a/src/graph.c b/src/graph.c index 60ac3adf..d86ac4a8 100644 --- a/src/graph.c +++ b/src/graph.c @@ -196,6 +196,15 @@ static void check_reachability(meshlink_handle_t *mesh) { /* Check reachability status. */ for splay_each(node_t, n, mesh->nodes) { + /* Check for nodes that have changed session_id */ + if(n->status.visited && n->prevedge && n->prevedge->reverse->session_id != n->session_id) { + n->session_id = n->prevedge->reverse->session_id; + + if(n->utcp) { + utcp_abort_all_connections(n->utcp); + } + } + if(n->status.visited != n->status.reachable) { n->status.reachable = !n->status.reachable; n->last_state_change = mesh->loop.now.tv_sec; diff --git a/src/meshlink.c b/src/meshlink.c index f1b6a334..0318a497 100644 --- a/src/meshlink.c +++ b/src/meshlink.c @@ -889,6 +889,7 @@ static bool meshlink_setup(meshlink_handle_t *mesh) { mesh->self->name = xstrdup(mesh->name); mesh->self->devclass = mesh->devclass; mesh->self->ecdsa = ecdsa_set_public_key(ecdsa_get_public_key(mesh->private_key)); + mesh->self->session_id = mesh->session_id; if(!write_main_config_files(mesh)) { logger(mesh, MESHLINK_ERROR, "Could not write main config files into %s/current: %s\n", mesh->confbase, strerror(errno)); @@ -967,6 +968,7 @@ static bool meshlink_read_config(meshlink_handle_t *mesh) { mesh->self = new_node(); mesh->self->name = xstrdup(name); mesh->self->devclass = mesh->devclass; + mesh->self->session_id = mesh->session_id; if(!node_read_public_key(mesh, mesh->self)) { logger(NULL, MESHLINK_ERROR, "Could not read our host configuration file!"); @@ -1288,6 +1290,10 @@ meshlink_handle_t *meshlink_open_ex(const meshlink_open_params_t *params) { randomize(&mesh->prng_state, sizeof(mesh->prng_state)); + do { + randomize(&mesh->session_id, sizeof(mesh->session_id)); + } while(mesh->session_id == 0); + memcpy(mesh->dev_class_traits, default_class_traits, sizeof(default_class_traits)); if(usingname) { @@ -2221,6 +2227,7 @@ bool meshlink_set_port(meshlink_handle_t *mesh, int port) { mesh->self = new_node(); mesh->self->name = xstrdup(mesh->name); mesh->self->devclass = mesh->devclass; + mesh->self->session_id = mesh->session_id; xasprintf(&mesh->myport, "%d", port); if(!node_read_public_key(mesh, mesh->self)) { diff --git a/src/meshlink_internal.h b/src/meshlink_internal.h index dd8c7d32..bad58255 100644 --- a/src/meshlink_internal.h +++ b/src/meshlink_internal.h @@ -133,6 +133,7 @@ struct meshlink_handle { struct connection_t *everyone; uint64_t prng_state[4]; + uint32_t session_id; int next_pit; int pits[10]; diff --git a/src/node.h b/src/node.h index e42aac4a..50ee141c 100644 --- a/src/node.h +++ b/src/node.h @@ -50,8 +50,9 @@ typedef struct node_t { dev_class_t devclass; // Used for packet I/O - sptps_t sptps; int sock; /* Socket to use for outgoing UDP packets */ + uint32_t session_id; /* Unique ID for this node's currently running process */ + sptps_t sptps; sockaddr_t address; /* his real (internet) ip to send UDP packets to */ struct utcp *utcp; diff --git a/src/protocol_edge.c b/src/protocol_edge.c index e817114b..6be658ee 100644 --- a/src/protocol_edge.c +++ b/src/protocol_edge.c @@ -74,9 +74,9 @@ bool send_add_edge(meshlink_handle_t *mesh, connection_t *c, const edge_t *e, in s = e->to->submesh; } - x = send_request(mesh, c, s, "%d %x %s %d %s %s %s %s %d %s %x %d %d", ADD_EDGE, prng(mesh, UINT_MAX), + x = send_request(mesh, c, s, "%d %x %s %d %s %s %s %s %d %s %x %d %d %x", ADD_EDGE, prng(mesh, UINT_MAX), e->from->name, e->from->devclass, from_submesh, e->to->name, address, port, - e->to->devclass, to_submesh, OPTION_PMTU_DISCOVERY, e->weight, contradictions); + e->to->devclass, to_submesh, OPTION_PMTU_DISCOVERY, e->weight, contradictions, e->from->session_id); free(address); free(port); @@ -100,11 +100,12 @@ bool add_edge_h(meshlink_handle_t *mesh, connection_t *c, const char *request) { sockaddr_t address; int weight; int contradictions = 0; + uint32_t session_id = 0; submesh_t *s = NULL; - if(sscanf(request, "%*d %*x "MAX_STRING" %d "MAX_STRING" "MAX_STRING" "MAX_STRING" "MAX_STRING" %d "MAX_STRING" %*x %d %d", + if(sscanf(request, "%*d %*x "MAX_STRING" %d "MAX_STRING" "MAX_STRING" "MAX_STRING" "MAX_STRING" %d "MAX_STRING" %*x %d %d %x", from_name, &from_devclass, from_submesh_name, to_name, to_address, to_port, &to_devclass, to_submesh_name, - &weight, &contradictions) < 9) { + &weight, &contradictions, &session_id) < 9) { logger(mesh, MESHLINK_ERROR, "Got bad %s from %s", "ADD_EDGE", c->name); return false; } @@ -164,6 +165,10 @@ bool add_edge_h(meshlink_handle_t *mesh, connection_t *c, const char *request) { from->devclass = from_devclass; + if(!from->session_id) { + from->session_id = session_id; + } + if(!to) { to = new_node(); to->status.dirty = true; @@ -194,7 +199,7 @@ bool add_edge_h(meshlink_handle_t *mesh, connection_t *c, const char *request) { e = lookup_edge(from, to); if(e) { - if(e->weight != weight || sockaddrcmp(&e->address, &address)) { + if(e->weight != weight || e->session_id != session_id || sockaddrcmp(&e->address, &address)) { if(from == mesh->self) { logger(mesh, MESHLINK_WARNING, "Got %s from %s for ourself which does not match existing entry", "ADD_EDGE", c->name); @@ -215,6 +220,7 @@ bool add_edge_h(meshlink_handle_t *mesh, connection_t *c, const char *request) { e = new_edge(); e->from = from; e->to = to; + e->session_id = session_id; send_del_edge(mesh, c, e, mesh->contradicting_add_edge); free_edge(e); return true; @@ -225,6 +231,7 @@ bool add_edge_h(meshlink_handle_t *mesh, connection_t *c, const char *request) { e->to = to; e->address = address; e->weight = weight; + e->session_id = session_id; edge_add(mesh, e); /* Run MST before or after we tell the rest? */ @@ -273,8 +280,8 @@ bool send_del_edge(meshlink_handle_t *mesh, connection_t *c, const edge_t *e, in s = e->to->submesh; } - return send_request(mesh, c, s, "%d %x %s %s %d", DEL_EDGE, prng(mesh, UINT_MAX), - e->from->name, e->to->name, contradictions); + return send_request(mesh, c, s, "%d %x %s %s %d %x", DEL_EDGE, prng(mesh, UINT_MAX), + e->from->name, e->to->name, contradictions, e->session_id); } bool del_edge_h(meshlink_handle_t *mesh, connection_t *c, const char *request) { @@ -286,9 +293,10 @@ bool del_edge_h(meshlink_handle_t *mesh, connection_t *c, const char *request) { char to_name[MAX_STRING_SIZE]; node_t *from, *to; int contradictions = 0; + uint32_t session_id = 0; submesh_t *s = NULL; - if(sscanf(request, "%*d %*x "MAX_STRING" "MAX_STRING" %d", from_name, to_name, &contradictions) < 2) { + if(sscanf(request, "%*d %*x "MAX_STRING" "MAX_STRING" %d %x", from_name, to_name, &contradictions, &session_id) < 2) { logger(mesh, MESHLINK_ERROR, "Got bad %s from %s", "DEL_EDGE", c->name); return false; } diff --git a/test/channels-failure.c b/test/channels-failure.c index 18aef2d5..5c980055 100644 --- a/test/channels-failure.c +++ b/test/channels-failure.c @@ -87,8 +87,8 @@ int main() { // Try setting up a new channel while b is still down. - poll_flag.flag = false; - receive_flag.flag = false; + set_sync_flag(&poll_flag, false); + set_sync_flag(&receive_flag, false); channel = meshlink_channel_open(mesh_a, b, 7, NULL, NULL, 0); assert(channel); @@ -98,6 +98,33 @@ int main() { assert(wait_sync_flag(&poll_flag, 5)); assert(poll_len == 0); + meshlink_channel_close(mesh_a, channel); + + // Restart b and create a new channel + + set_sync_flag(&poll_flag, false); + set_sync_flag(&receive_flag, false); + + meshlink_set_node_channel_timeout(mesh_a, b, 60); + + assert(meshlink_start(mesh_b)); + + channel = meshlink_channel_open(mesh_a, b, 7, receive_cb, NULL, 0); + meshlink_set_channel_poll_cb(mesh_a, channel, poll_cb); + assert(channel); + assert(wait_sync_flag(&poll_flag, 10)); + assert(poll_len != 0); + + // Close and reopen b, we should get a fast notification that the channel has been closed. + + meshlink_close(mesh_b); + mesh_b = meshlink_open("channels_failure_conf.2", "b", "channels_failure", DEV_CLASS_BACKBONE); + assert(mesh_b); + assert(meshlink_start(mesh_b)); + + assert(wait_sync_flag(&receive_flag, 10)); + assert(receive_len == 0); + // Clean up. close_meshlink_pair(mesh_a, mesh_b); -- 2.39.2