struct edge_t *reverse; /* edge in the opposite direction, if available */
int weight; /* weight of this edge */
+ uint32_t session_id; /* the session_id of the from node */
} edge_t;
extern void init_edges(struct meshlink_handle *mesh);
/* Check reachability status. */
for splay_each(node_t, n, mesh->nodes) {
+ /* Check for nodes that have changed session_id */
+ if(n->status.visited && n->prevedge && n->prevedge->reverse->session_id != n->session_id) {
+ n->session_id = n->prevedge->reverse->session_id;
+
+ if(n->utcp) {
+ utcp_abort_all_connections(n->utcp);
+ }
+ }
+
if(n->status.visited != n->status.reachable) {
n->status.reachable = !n->status.reachable;
n->last_state_change = mesh->loop.now.tv_sec;
mesh->self->name = xstrdup(mesh->name);
mesh->self->devclass = mesh->devclass;
mesh->self->ecdsa = ecdsa_set_public_key(ecdsa_get_public_key(mesh->private_key));
+ mesh->self->session_id = mesh->session_id;
if(!write_main_config_files(mesh)) {
logger(mesh, MESHLINK_ERROR, "Could not write main config files into %s/current: %s\n", mesh->confbase, strerror(errno));
mesh->self = new_node();
mesh->self->name = xstrdup(name);
mesh->self->devclass = mesh->devclass;
+ mesh->self->session_id = mesh->session_id;
if(!node_read_public_key(mesh, mesh->self)) {
logger(NULL, MESHLINK_ERROR, "Could not read our host configuration file!");
randomize(&mesh->prng_state, sizeof(mesh->prng_state));
+ do {
+ randomize(&mesh->session_id, sizeof(mesh->session_id));
+ } while(mesh->session_id == 0);
+
memcpy(mesh->dev_class_traits, default_class_traits, sizeof(default_class_traits));
if(usingname) {
mesh->self = new_node();
mesh->self->name = xstrdup(mesh->name);
mesh->self->devclass = mesh->devclass;
+ mesh->self->session_id = mesh->session_id;
xasprintf(&mesh->myport, "%d", port);
if(!node_read_public_key(mesh, mesh->self)) {
struct connection_t *everyone;
uint64_t prng_state[4];
+ uint32_t session_id;
int next_pit;
int pits[10];
dev_class_t devclass;
// Used for packet I/O
- sptps_t sptps;
int sock; /* Socket to use for outgoing UDP packets */
+ uint32_t session_id; /* Unique ID for this node's currently running process */
+ sptps_t sptps;
sockaddr_t address; /* his real (internet) ip to send UDP packets to */
struct utcp *utcp;
s = e->to->submesh;
}
- x = send_request(mesh, c, s, "%d %x %s %d %s %s %s %s %d %s %x %d %d", ADD_EDGE, prng(mesh, UINT_MAX),
+ x = send_request(mesh, c, s, "%d %x %s %d %s %s %s %s %d %s %x %d %d %x", ADD_EDGE, prng(mesh, UINT_MAX),
e->from->name, e->from->devclass, from_submesh, e->to->name, address, port,
- e->to->devclass, to_submesh, OPTION_PMTU_DISCOVERY, e->weight, contradictions);
+ e->to->devclass, to_submesh, OPTION_PMTU_DISCOVERY, e->weight, contradictions, e->from->session_id);
free(address);
free(port);
sockaddr_t address;
int weight;
int contradictions = 0;
+ uint32_t session_id = 0;
submesh_t *s = NULL;
- if(sscanf(request, "%*d %*x "MAX_STRING" %d "MAX_STRING" "MAX_STRING" "MAX_STRING" "MAX_STRING" %d "MAX_STRING" %*x %d %d",
+ if(sscanf(request, "%*d %*x "MAX_STRING" %d "MAX_STRING" "MAX_STRING" "MAX_STRING" "MAX_STRING" %d "MAX_STRING" %*x %d %d %x",
from_name, &from_devclass, from_submesh_name, to_name, to_address, to_port, &to_devclass, to_submesh_name,
- &weight, &contradictions) < 9) {
+ &weight, &contradictions, &session_id) < 9) {
logger(mesh, MESHLINK_ERROR, "Got bad %s from %s", "ADD_EDGE", c->name);
return false;
}
from->devclass = from_devclass;
+ if(!from->session_id) {
+ from->session_id = session_id;
+ }
+
if(!to) {
to = new_node();
to->status.dirty = true;
e = lookup_edge(from, to);
if(e) {
- if(e->weight != weight || sockaddrcmp(&e->address, &address)) {
+ if(e->weight != weight || e->session_id != session_id || sockaddrcmp(&e->address, &address)) {
if(from == mesh->self) {
logger(mesh, MESHLINK_WARNING, "Got %s from %s for ourself which does not match existing entry",
"ADD_EDGE", c->name);
e = new_edge();
e->from = from;
e->to = to;
+ e->session_id = session_id;
send_del_edge(mesh, c, e, mesh->contradicting_add_edge);
free_edge(e);
return true;
e->to = to;
e->address = address;
e->weight = weight;
+ e->session_id = session_id;
edge_add(mesh, e);
/* Run MST before or after we tell the rest? */
s = e->to->submesh;
}
- return send_request(mesh, c, s, "%d %x %s %s %d", DEL_EDGE, prng(mesh, UINT_MAX),
- e->from->name, e->to->name, contradictions);
+ return send_request(mesh, c, s, "%d %x %s %s %d %x", DEL_EDGE, prng(mesh, UINT_MAX),
+ e->from->name, e->to->name, contradictions, e->session_id);
}
bool del_edge_h(meshlink_handle_t *mesh, connection_t *c, const char *request) {
char to_name[MAX_STRING_SIZE];
node_t *from, *to;
int contradictions = 0;
+ uint32_t session_id = 0;
submesh_t *s = NULL;
- if(sscanf(request, "%*d %*x "MAX_STRING" "MAX_STRING" %d", from_name, to_name, &contradictions) < 2) {
+ if(sscanf(request, "%*d %*x "MAX_STRING" "MAX_STRING" %d %x", from_name, to_name, &contradictions, &session_id) < 2) {
logger(mesh, MESHLINK_ERROR, "Got bad %s from %s", "DEL_EDGE", c->name);
return false;
}
// Try setting up a new channel while b is still down.
- poll_flag.flag = false;
- receive_flag.flag = false;
+ set_sync_flag(&poll_flag, false);
+ set_sync_flag(&receive_flag, false);
channel = meshlink_channel_open(mesh_a, b, 7, NULL, NULL, 0);
assert(channel);
assert(wait_sync_flag(&poll_flag, 5));
assert(poll_len == 0);
+ meshlink_channel_close(mesh_a, channel);
+
+ // Restart b and create a new channel
+
+ set_sync_flag(&poll_flag, false);
+ set_sync_flag(&receive_flag, false);
+
+ meshlink_set_node_channel_timeout(mesh_a, b, 60);
+
+ assert(meshlink_start(mesh_b));
+
+ channel = meshlink_channel_open(mesh_a, b, 7, receive_cb, NULL, 0);
+ meshlink_set_channel_poll_cb(mesh_a, channel, poll_cb);
+ assert(channel);
+ assert(wait_sync_flag(&poll_flag, 10));
+ assert(poll_len != 0);
+
+ // Close and reopen b, we should get a fast notification that the channel has been closed.
+
+ meshlink_close(mesh_b);
+ mesh_b = meshlink_open("channels_failure_conf.2", "b", "channels_failure", DEV_CLASS_BACKBONE);
+ assert(mesh_b);
+ assert(meshlink_start(mesh_b));
+
+ assert(wait_sync_flag(&receive_flag, 10));
+ assert(receive_len == 0);
+
// Clean up.
close_meshlink_pair(mesh_a, mesh_b);