]> git.meshlink.io Git - meshlink/commitdiff
Add a global metering notification callback. feature/metering-callback
authorGuus Sliepen <guus@meshlink.io>
Sun, 14 Nov 2021 16:48:20 +0000 (17:48 +0100)
committerGuus Sliepen <guus@meshlink.io>
Sun, 14 Nov 2021 16:48:20 +0000 (17:48 +0100)
This callback is called when global traffic counters for a mesh exceed a
configurable threshold and/or a timeout.

src/devtools.c
src/devtools.h
src/meshlink.sym
src/meshlink_internal.h
src/meta.c
src/net.c
src/net_packet.c
src/net_socket.c
src/protocol_key.c
src/route.c

index 12c5432e2ca8dbf9ae7ece2accb612fa543c15e6..899e05c1994f14964064ce73ab64f2d963d837fd 100644 (file)
@@ -407,3 +407,46 @@ void devtool_set_meta_status_cb(meshlink_handle_t *mesh, meshlink_node_status_cb
        mesh->meta_status_cb = cb;
        pthread_mutex_unlock(&mesh->mutex);
 }
+
+void devtool_set_global_metering_cb(meshlink_handle_t *mesh, meshlink_global_metering_cb_t cb, uint64_t threshold, int timeout) {
+       if(!mesh) {
+               meshlink_errno = MESHLINK_EINVAL;
+       }
+
+       if(pthread_mutex_lock(&mesh->mutex) != 0) {
+               abort();
+       }
+
+       mesh->global_metering_cb = cb;
+       mesh->metering_threshold = threshold;
+       mesh->metering_timeout = timeout;
+       pthread_mutex_unlock(&mesh->mutex);
+}
+
+void check_global_metering(meshlink_handle_t *mesh) {
+       uint64_t sum =
+               mesh->self->in_data + mesh->self->out_data +
+               mesh->self->in_forward + mesh->self->out_forward +
+               mesh->self->in_meta + mesh->self->out_meta;
+
+       if(sum >= mesh->metering_threshold || mesh->loop.now.tv_sec >= mesh->last_metering_cb + mesh->metering_timeout) {
+               devtool_node_status_t status;
+               memset(&status, 0, sizeof status);
+               status.in_data = mesh->self->in_data;
+               status.out_data = mesh->self->out_data;
+               status.in_forward = mesh->self->in_forward;
+               status.out_forward = mesh->self->out_forward;
+               status.in_meta = mesh->self->in_meta;
+               status.out_meta = mesh->self->out_meta;
+
+               mesh->global_metering_cb(mesh, &status);
+
+               mesh->self->in_data = 0;
+               mesh->self->out_data = 0;
+               mesh->self->in_forward = 0;
+               mesh->self->out_forward = 0;
+               mesh->self->in_meta = 0;
+               mesh->self->out_meta = 0;
+               mesh->last_metering_cb = mesh->loop.now.tv_sec;
+       }
+}
\ No newline at end of file
index 78e3642859e50301ce84399419735eb579337fe1..5bcd23bea2511c02a356a8f0d183c58338635de3 100644 (file)
@@ -227,4 +227,30 @@ extern void (*devtool_set_inviter_commits_first)(bool inviter_commited_first);
  */
 void devtool_set_meta_status_cb(struct meshlink_handle *mesh, meshlink_node_status_cb_t cb);
 
+/// A callback notifying about global traffic counters.
+/*  @param mesh   A handle which represents an instance of MeshLink..
+ *  @param data   A pointer to a data structure containing global traffic counters.
+ *                The application must only access this data during the callback,
+ *                the pointer is no longer valid after the callback function returns.
+ *                The counters will be reset after the callback function returns.
+ */
+typedef void (*meshlink_global_metering_cb_t)(meshlink_handle_t *mesh, const struct devtool_node_status *data);
+
+/// Set the global metering callback.
+/** This functions sets the callback that is called when a certain amount of traffic has occured or a timeout has passed.
+ *
+ *  The callback is run in MeshLink's own thread.
+ *  It is important that the callback uses apprioriate methods (queues, pipes, locking, etc.)
+ *  to hand the data over to the application's thread.
+ *  The callback should also not block itself and return as quickly as possible.
+ *
+ *  \memberof meshlink_handle
+ *  @param mesh      A handle which represents an instance of MeshLink..
+ *  @param cb        A pointer to the function which will be called when there is a traffic counter update..
+ *                   If a NULL pointer is given, the callback will be disabled.
+ *  @param threshold Threshold in bytes after which the callback will be called.
+ *  @param timeout   Timeout in seconds after which the callback will be called if the threshold was not reached before that time.
+ */
+void devtool_set_global_metering_cb(meshlink_handle_t *mesh, meshlink_global_metering_cb_t cb, uint64_t threshold, int timeout);
+
 #endif
index 8ece4dc505593333273745758f36843cc89b62af..fef59f2d3cb15a84a6c5e8f65dbd00cf40e85cdf 100644 (file)
@@ -6,6 +6,7 @@ devtool_get_node_status
 devtool_keyrotate_probe
 devtool_open_in_netns
 devtool_reset_node_counters
+devtool_set_global_metering_cb
 devtool_set_meta_status_cb
 devtool_set_inviter_commits_first
 devtool_trybind_probe
index b57f212c5fdf6569243c2256497f8e11f5514d6f..d0d7fbc4fb10b14031aee47ee88bb4a711071f83 100644 (file)
@@ -77,6 +77,10 @@ typedef struct {
        int edge_weight;
 } dev_class_traits_t;
 
+struct devtool_node_status;
+void check_global_metering(meshlink_handle_t *mesh);
+typedef void (*meshlink_global_metering_cb_t)(meshlink_handle_t *mesh, const struct devtool_node_status *data);
+
 /// A handle for an instance of MeshLink.
 struct meshlink_handle {
        // public members
@@ -130,6 +134,10 @@ struct meshlink_handle {
        int next_pit;
        int pits[10];
 
+       uint64_t metering_threshold;
+       time_t last_metering_cb;
+       int metering_timeout;
+
        // Infrequently used callbacks
        meshlink_node_status_cb_t node_status_cb;
        meshlink_node_status_cb_t meta_status_cb;
@@ -141,6 +149,7 @@ struct meshlink_handle {
        meshlink_error_cb_t error_cb;
        meshlink_blacklisted_cb_t blacklisted_cb;
        meshlink_thread_status_cb_t thread_status_cb;
+       meshlink_global_metering_cb_t global_metering_cb;
 
        // Mesh parameters
        char *appname;
index ffd835c46e6bdbf108553aa28e7d7ff0be321daa..da0e267fb11349c56fcc180150492c292f51b710 100644 (file)
@@ -152,6 +152,7 @@ bool receive_meta(meshlink_handle_t *mesh, connection_t *c) {
 
        if(c->node) {
                c->node->in_meta += inlen;
+               mesh->self->in_meta += inlen;
        }
 
        if(c->allow_request == ID) {
index 35cfc6502a6b12091fd36ed8d9353aa38765b3ce..d69e8a55a3906e1091ace0e553d19cf5067a642d 100644 (file)
--- a/src/net.c
+++ b/src/net.c
@@ -164,6 +164,10 @@ static void timeout_handler(event_loop_t *loop, void *data) {
                }
        }
 
+       if(mesh->global_metering_cb) {
+               check_global_metering(mesh);
+       }
+
        timeout_set(&mesh->loop, data, &(struct timespec) {
                1, prng(mesh, TIMER_FUDGE)
        });
index acc90c97497668f00e491d724d801626e2648402..c37b3b45f6816b6bedd01f0897a0f66406986528 100644 (file)
@@ -145,6 +145,7 @@ static void send_mtu_probe_handler(event_loop_t *loop, void *data) {
                logger(mesh, MESHLINK_DEBUG, "Sending MTU probe length %d to %s", len, n->name);
 
                n->out_meta += packet.len + PROBE_OVERHEAD;
+               mesh->self->out_meta += packet.len + PROBE_OVERHEAD;
                send_udppacket(mesh, n, &packet);
        }
 
@@ -165,6 +166,7 @@ void send_mtu_probe(meshlink_handle_t *mesh, node_t *n) {
 
 static void mtu_probe_h(meshlink_handle_t *mesh, node_t *n, vpn_packet_t *packet, uint16_t len) {
        n->in_meta += len + PROBE_OVERHEAD;
+       mesh->self->in_meta += len + PROBE_OVERHEAD;
 
        if(len < 64) {
                logger(mesh, MESHLINK_WARNING, "Got too short MTU probe length %d from %s", packet->len, n->name);
@@ -185,6 +187,7 @@ static void mtu_probe_h(meshlink_handle_t *mesh, node_t *n, vpn_packet_t *packet
                n->status.udp_confirmed = true;
                logger(mesh, MESHLINK_DEBUG, "Sending MTU probe reply %d to %s", packet->len, n->name);
                n->out_meta += packet->len + PROBE_OVERHEAD;
+               mesh->self->out_meta += packet->len + PROBE_OVERHEAD;
                send_udppacket(mesh, n, packet);
                n->status.udp_confirmed = udp_confirmed;
        } else {
index 8b1263b063fb8df8c5900e68a09361921ac5e636..7d1b62244fc5b5513fd67f18cfdeacb68c7cfefd 100644 (file)
@@ -138,6 +138,7 @@ static void handle_meta_write(meshlink_handle_t *mesh, connection_t *c) {
 
        if(c->node) {
                c->node->out_meta += outlen;
+               mesh->self->out_meta += outlen;
        }
 
        buffer_read(&c->outbuf, outlen);
index 745845a650ebd0453660e53301a0cbc056459931..92789efc4199ed750b497776df7f37654ca39a76 100644 (file)
@@ -348,6 +348,8 @@ bool req_key_h(meshlink_handle_t *mesh, connection_t *c, const char *request) {
                size_t len = strlen(request);
                from->in_forward += len + SPTPS_OVERHEAD;
                to->out_forward += len + SPTPS_OVERHEAD;
+               mesh->self->in_forward += len + SPTPS_OVERHEAD;
+               mesh->self->out_forward += len + SPTPS_OVERHEAD;
 
                send_request(mesh, to->nexthop->connection, NULL, "%s", request);
        }
@@ -419,6 +421,8 @@ bool ans_key_h(meshlink_handle_t *mesh, connection_t *c, const char *request) {
                size_t len = strlen(request);
                from->in_forward += len + SPTPS_OVERHEAD;
                to->out_forward += len + SPTPS_OVERHEAD;
+               mesh->self->in_forward += len + SPTPS_OVERHEAD;
+               mesh->self->out_forward += len + SPTPS_OVERHEAD;
 
                /* Append the known UDP address of the from node, if we have a confirmed one */
                if(!*address && from->status.udp_confirmed && from->address.sa.sa_family != AF_UNSPEC) {
index c63c03732f4e39c8b93cff6233b105e3257ebefc..83d25e6eefcb9c66e6bb5506ae3f4c3c1e7f2501 100644 (file)
@@ -62,10 +62,12 @@ void route(meshlink_handle_t *mesh, node_t *source, vpn_packet_t *packet) {
        // Channel traffic accounting
        if(source == mesh->self) {
                dest->out_data += len + SPTPS_OVERHEAD;
+               mesh->self->out_data += len + SPTPS_OVERHEAD;
        }
 
        if(dest == mesh->self) {
                source->in_data += len + SPTPS_OVERHEAD;
+               mesh->self->in_data += len + SPTPS_OVERHEAD;
                const void *payload = packet->data + sizeof(*hdr);
 
                char hex[len * 2 + 1];