From 3f30b5bc7b1477723174c2e89021fe5a2fbdf7dd Mon Sep 17 00:00:00 2001 From: Guus Sliepen Date: Sun, 14 Nov 2021 17:48:20 +0100 Subject: [PATCH] Add a global metering notification callback. This callback is called when global traffic counters for a mesh exceed a configurable threshold and/or a timeout. --- src/devtools.c | 43 +++++++++++++++++++++++++++++++++++++++++ src/devtools.h | 26 +++++++++++++++++++++++++ src/meshlink.sym | 1 + src/meshlink_internal.h | 9 +++++++++ src/meta.c | 1 + src/net.c | 4 ++++ src/net_packet.c | 3 +++ src/net_socket.c | 1 + src/protocol_key.c | 4 ++++ src/route.c | 2 ++ 10 files changed, 94 insertions(+) diff --git a/src/devtools.c b/src/devtools.c index 12c5432e..899e05c1 100644 --- a/src/devtools.c +++ b/src/devtools.c @@ -407,3 +407,46 @@ void devtool_set_meta_status_cb(meshlink_handle_t *mesh, meshlink_node_status_cb mesh->meta_status_cb = cb; pthread_mutex_unlock(&mesh->mutex); } + +void devtool_set_global_metering_cb(meshlink_handle_t *mesh, meshlink_global_metering_cb_t cb, uint64_t threshold, int timeout) { + if(!mesh) { + meshlink_errno = MESHLINK_EINVAL; + } + + if(pthread_mutex_lock(&mesh->mutex) != 0) { + abort(); + } + + mesh->global_metering_cb = cb; + mesh->metering_threshold = threshold; + mesh->metering_timeout = timeout; + pthread_mutex_unlock(&mesh->mutex); +} + +void check_global_metering(meshlink_handle_t *mesh) { + uint64_t sum = + mesh->self->in_data + mesh->self->out_data + + mesh->self->in_forward + mesh->self->out_forward + + mesh->self->in_meta + mesh->self->out_meta; + + if(sum >= mesh->metering_threshold || mesh->loop.now.tv_sec >= mesh->last_metering_cb + mesh->metering_timeout) { + devtool_node_status_t status; + memset(&status, 0, sizeof status); + status.in_data = mesh->self->in_data; + status.out_data = mesh->self->out_data; + status.in_forward = mesh->self->in_forward; + status.out_forward = mesh->self->out_forward; + status.in_meta = mesh->self->in_meta; + status.out_meta = mesh->self->out_meta; + + mesh->global_metering_cb(mesh, &status); + + mesh->self->in_data = 0; + mesh->self->out_data = 0; + mesh->self->in_forward = 0; + mesh->self->out_forward = 0; + mesh->self->in_meta = 0; + mesh->self->out_meta = 0; + mesh->last_metering_cb = mesh->loop.now.tv_sec; + } +} \ No newline at end of file diff --git a/src/devtools.h b/src/devtools.h index 78e36428..5bcd23be 100644 --- a/src/devtools.h +++ b/src/devtools.h @@ -227,4 +227,30 @@ extern void (*devtool_set_inviter_commits_first)(bool inviter_commited_first); */ void devtool_set_meta_status_cb(struct meshlink_handle *mesh, meshlink_node_status_cb_t cb); +/// A callback notifying about global traffic counters. +/* @param mesh A handle which represents an instance of MeshLink.. + * @param data A pointer to a data structure containing global traffic counters. + * The application must only access this data during the callback, + * the pointer is no longer valid after the callback function returns. + * The counters will be reset after the callback function returns. + */ +typedef void (*meshlink_global_metering_cb_t)(meshlink_handle_t *mesh, const struct devtool_node_status *data); + +/// Set the global metering callback. +/** This functions sets the callback that is called when a certain amount of traffic has occured or a timeout has passed. + * + * The callback is run in MeshLink's own thread. + * It is important that the callback uses apprioriate methods (queues, pipes, locking, etc.) + * to hand the data over to the application's thread. + * The callback should also not block itself and return as quickly as possible. + * + * \memberof meshlink_handle + * @param mesh A handle which represents an instance of MeshLink.. + * @param cb A pointer to the function which will be called when there is a traffic counter update.. + * If a NULL pointer is given, the callback will be disabled. + * @param threshold Threshold in bytes after which the callback will be called. + * @param timeout Timeout in seconds after which the callback will be called if the threshold was not reached before that time. + */ +void devtool_set_global_metering_cb(meshlink_handle_t *mesh, meshlink_global_metering_cb_t cb, uint64_t threshold, int timeout); + #endif diff --git a/src/meshlink.sym b/src/meshlink.sym index 8ece4dc5..fef59f2d 100644 --- a/src/meshlink.sym +++ b/src/meshlink.sym @@ -6,6 +6,7 @@ devtool_get_node_status devtool_keyrotate_probe devtool_open_in_netns devtool_reset_node_counters +devtool_set_global_metering_cb devtool_set_meta_status_cb devtool_set_inviter_commits_first devtool_trybind_probe diff --git a/src/meshlink_internal.h b/src/meshlink_internal.h index b57f212c..d0d7fbc4 100644 --- a/src/meshlink_internal.h +++ b/src/meshlink_internal.h @@ -77,6 +77,10 @@ typedef struct { int edge_weight; } dev_class_traits_t; +struct devtool_node_status; +void check_global_metering(meshlink_handle_t *mesh); +typedef void (*meshlink_global_metering_cb_t)(meshlink_handle_t *mesh, const struct devtool_node_status *data); + /// A handle for an instance of MeshLink. struct meshlink_handle { // public members @@ -130,6 +134,10 @@ struct meshlink_handle { int next_pit; int pits[10]; + uint64_t metering_threshold; + time_t last_metering_cb; + int metering_timeout; + // Infrequently used callbacks meshlink_node_status_cb_t node_status_cb; meshlink_node_status_cb_t meta_status_cb; @@ -141,6 +149,7 @@ struct meshlink_handle { meshlink_error_cb_t error_cb; meshlink_blacklisted_cb_t blacklisted_cb; meshlink_thread_status_cb_t thread_status_cb; + meshlink_global_metering_cb_t global_metering_cb; // Mesh parameters char *appname; diff --git a/src/meta.c b/src/meta.c index ffd835c4..da0e267f 100644 --- a/src/meta.c +++ b/src/meta.c @@ -152,6 +152,7 @@ bool receive_meta(meshlink_handle_t *mesh, connection_t *c) { if(c->node) { c->node->in_meta += inlen; + mesh->self->in_meta += inlen; } if(c->allow_request == ID) { diff --git a/src/net.c b/src/net.c index 35cfc650..d69e8a55 100644 --- a/src/net.c +++ b/src/net.c @@ -164,6 +164,10 @@ static void timeout_handler(event_loop_t *loop, void *data) { } } + if(mesh->global_metering_cb) { + check_global_metering(mesh); + } + timeout_set(&mesh->loop, data, &(struct timespec) { 1, prng(mesh, TIMER_FUDGE) }); diff --git a/src/net_packet.c b/src/net_packet.c index acc90c97..c37b3b45 100644 --- a/src/net_packet.c +++ b/src/net_packet.c @@ -145,6 +145,7 @@ static void send_mtu_probe_handler(event_loop_t *loop, void *data) { logger(mesh, MESHLINK_DEBUG, "Sending MTU probe length %d to %s", len, n->name); n->out_meta += packet.len + PROBE_OVERHEAD; + mesh->self->out_meta += packet.len + PROBE_OVERHEAD; send_udppacket(mesh, n, &packet); } @@ -165,6 +166,7 @@ void send_mtu_probe(meshlink_handle_t *mesh, node_t *n) { static void mtu_probe_h(meshlink_handle_t *mesh, node_t *n, vpn_packet_t *packet, uint16_t len) { n->in_meta += len + PROBE_OVERHEAD; + mesh->self->in_meta += len + PROBE_OVERHEAD; if(len < 64) { logger(mesh, MESHLINK_WARNING, "Got too short MTU probe length %d from %s", packet->len, n->name); @@ -185,6 +187,7 @@ static void mtu_probe_h(meshlink_handle_t *mesh, node_t *n, vpn_packet_t *packet n->status.udp_confirmed = true; logger(mesh, MESHLINK_DEBUG, "Sending MTU probe reply %d to %s", packet->len, n->name); n->out_meta += packet->len + PROBE_OVERHEAD; + mesh->self->out_meta += packet->len + PROBE_OVERHEAD; send_udppacket(mesh, n, packet); n->status.udp_confirmed = udp_confirmed; } else { diff --git a/src/net_socket.c b/src/net_socket.c index 8b1263b0..7d1b6224 100644 --- a/src/net_socket.c +++ b/src/net_socket.c @@ -138,6 +138,7 @@ static void handle_meta_write(meshlink_handle_t *mesh, connection_t *c) { if(c->node) { c->node->out_meta += outlen; + mesh->self->out_meta += outlen; } buffer_read(&c->outbuf, outlen); diff --git a/src/protocol_key.c b/src/protocol_key.c index 745845a6..92789efc 100644 --- a/src/protocol_key.c +++ b/src/protocol_key.c @@ -348,6 +348,8 @@ bool req_key_h(meshlink_handle_t *mesh, connection_t *c, const char *request) { size_t len = strlen(request); from->in_forward += len + SPTPS_OVERHEAD; to->out_forward += len + SPTPS_OVERHEAD; + mesh->self->in_forward += len + SPTPS_OVERHEAD; + mesh->self->out_forward += len + SPTPS_OVERHEAD; send_request(mesh, to->nexthop->connection, NULL, "%s", request); } @@ -419,6 +421,8 @@ bool ans_key_h(meshlink_handle_t *mesh, connection_t *c, const char *request) { size_t len = strlen(request); from->in_forward += len + SPTPS_OVERHEAD; to->out_forward += len + SPTPS_OVERHEAD; + mesh->self->in_forward += len + SPTPS_OVERHEAD; + mesh->self->out_forward += len + SPTPS_OVERHEAD; /* Append the known UDP address of the from node, if we have a confirmed one */ if(!*address && from->status.udp_confirmed && from->address.sa.sa_family != AF_UNSPEC) { diff --git a/src/route.c b/src/route.c index c63c0373..83d25e6e 100644 --- a/src/route.c +++ b/src/route.c @@ -62,10 +62,12 @@ void route(meshlink_handle_t *mesh, node_t *source, vpn_packet_t *packet) { // Channel traffic accounting if(source == mesh->self) { dest->out_data += len + SPTPS_OVERHEAD; + mesh->self->out_data += len + SPTPS_OVERHEAD; } if(dest == mesh->self) { source->in_data += len + SPTPS_OVERHEAD; + mesh->self->in_data += len + SPTPS_OVERHEAD; const void *payload = packet->data + sizeof(*hdr); char hex[len * 2 + 1]; -- 2.39.2