From e40d5bf3a0e030105334046319f377efbf3f06c4 Mon Sep 17 00:00:00 2001 From: Guus Sliepen Date: Wed, 29 Jul 2020 14:44:42 +0200 Subject: [PATCH] Add meshlink_set_channel_listen_cb(). The accept callback is called when the peer has already fully established a connection. The listen callback is called earlier, when there is no fully established channel yet. However, the listen callback itself does not get a channel handle, it can only make a decision based on the peer node and port number whether to accept the channel, and if so the accept callback will be called later. --- src/meshlink++.h | 38 +++++++++++++++++++++++++++++ src/meshlink.c | 22 ++++++++++++++++- src/meshlink.h | 42 +++++++++++++++++++++++++++++---- src/meshlink.sym | 1 + src/meshlink_internal.h | 1 + src/utcp.c | 10 ++++---- src/utcp.h | 6 ++--- src/utcp_priv.h | 2 +- test/channels-aio-fd.c | 11 --------- test/channels-aio.c | 11 --------- test/channels-cornercases.c | 12 ---------- test/channels-failure.c | 29 +++++++++++++++++++---- test/channels-fork.c | 12 ---------- test/channels-udp-cornercases.c | 12 ---------- test/channels.c | 11 --------- test/echo-fork.c | 12 ---------- 16 files changed, 131 insertions(+), 101 deletions(-) diff --git a/src/meshlink++.h b/src/meshlink++.h index eeb1cd02..e0a51066 100644 --- a/src/meshlink++.h +++ b/src/meshlink++.h @@ -79,6 +79,15 @@ typedef void (*duplicate_cb_t)(mesh *mesh, node *node); */ typedef void (*log_cb_t)(mesh *mesh, log_level_t level, const char *text); +/// A callback for listening for incoming channels. +/** @param mesh A handle which represents an instance of MeshLink. + * @param node A handle for the node that wants to open a channel. + * @param port The port number the peer wishes to connect to. + * + * @return This function should return true if the application listens for the incoming channel, false otherwise. + */ +typedef bool (*meshlink_channel_listen_cb_t)(struct meshlink_handle *mesh, struct meshlink_node *node, uint16_t port); + /// A callback for accepting incoming channels. /** @param mesh A handle which represents an instance of MeshLink. * @param channel A handle for the incoming channel. @@ -271,6 +280,25 @@ public: (void)peer; } + /// This functions is called to determine if we are listening for incoming channels. + /** + * The function is run in MeshLink's own thread. + * It is therefore important that the callback uses apprioriate methods (queues, pipes, locking, etc.) + * to pass data to or from the application's thread. + * The callback should also not block itself and return as quickly as possible. + * + * @param node A handle for the node that wants to open a channel. + * @param port The port number the peer wishes to connect to. + * + * @return This function should return true if the application accepts the incoming channel, false otherwise. + */ + virtual bool channel_listen(node *node, uint16_t port) { + /* by default accept all channels */ + (void)node; + (void)port; + return true; + } + /// This functions is called whenever another node attempts to open a channel to the local node. /** * If the channel is accepted, the poll_callback will be set to channel_poll and can be @@ -346,6 +374,7 @@ public: meshlink_set_node_duplicate_cb(handle, &node_duplicate_trampoline); meshlink_set_log_cb(handle, MESHLINK_DEBUG, &log_trampoline); meshlink_set_error_cb(handle, &error_trampoline); + meshlink_set_channel_listen_cb(handle, &channel_listen_trampoline); meshlink_set_channel_accept_cb(handle, &channel_accept_trampoline); meshlink_set_connection_try_cb(handle, &connection_try_trampoline); return meshlink_start(handle); @@ -1150,6 +1179,15 @@ private: that->connection_try(static_cast(peer)); } + static bool channel_listen_trampoline(meshlink_handle_t *handle, meshlink_node_t *node, uint16_t port) { + if(!(handle->priv)) { + return false; + } + + meshlink::mesh *that = static_cast(handle->priv); + return that->channel_listen(static_cast(node), port); + } + static bool channel_accept_trampoline(meshlink_handle_t *handle, meshlink_channel *channel, uint16_t port, const void *data, size_t len) { if(!(handle->priv)) { return false; diff --git a/src/meshlink.c b/src/meshlink.c index 0843cc0c..f949c2c7 100644 --- a/src/meshlink.c +++ b/src/meshlink.c @@ -3626,7 +3626,12 @@ static bool channel_pre_accept(struct utcp *utcp, uint16_t port) { (void)port; node_t *n = utcp->priv; meshlink_handle_t *mesh = n->mesh; - return mesh->channel_accept_cb; + + if(mesh->channel_accept_cb && mesh->channel_listen_cb) { + return mesh->channel_listen_cb(mesh, (meshlink_node_t *)n, port); + } else { + return mesh->channel_accept_cb; + } } /* Finish one AIO buffer, return true if the channel is still open. */ @@ -3931,6 +3936,21 @@ void meshlink_set_channel_poll_cb(meshlink_handle_t *mesh, meshlink_channel_t *c pthread_mutex_unlock(&mesh->mutex); } +void meshlink_set_channel_listen_cb(meshlink_handle_t *mesh, meshlink_channel_listen_cb_t cb) { + if(!mesh) { + meshlink_errno = MESHLINK_EINVAL; + return; + } + + if(pthread_mutex_lock(&mesh->mutex) != 0) { + abort(); + } + + mesh->channel_listen_cb = cb; + + pthread_mutex_unlock(&mesh->mutex); +} + void meshlink_set_channel_accept_cb(meshlink_handle_t *mesh, meshlink_channel_accept_cb_t cb) { if(!mesh) { meshlink_errno = MESHLINK_EINVAL; diff --git a/src/meshlink.h b/src/meshlink.h index 8293da0f..09a0028d 100644 --- a/src/meshlink.h +++ b/src/meshlink.h @@ -1158,9 +1158,25 @@ bool meshlink_whitelist_by_name(struct meshlink_handle *mesh, const char *name) */ void meshlink_set_default_blacklist(struct meshlink_handle *mesh, bool blacklist); -/// A callback for accepting incoming channels. +/// A callback for listening for incoming channels. /** This function is called whenever a remote node wants to open a channel to the local node. - * The application then has to decide whether to accept or reject this channel. + * This callback should only make a decision whether to accept or reject this channel. + * The accept callback should be set to get a handle to the actual channel. + * + * The callback is run in MeshLink's own thread. + * It is therefore important that the callback return quickly and uses apprioriate methods (queues, pipes, locking, etc.) + * to hand any data over to the application's thread. + * + * @param mesh A handle which represents an instance of MeshLink. + * @param node A handle for the node that wants to open a channel. + * @param port The port number the peer wishes to connect to. + * + * @return This function should return true if the application accepts the incoming channel, false otherwise. + */ +typedef bool (*meshlink_channel_listen_cb_t)(struct meshlink_handle *mesh, struct meshlink_node *node, uint16_t port); + +/// A callback for accepting incoming channels. +/** This function is called whenever a remote node has opened a channel to the local node. * * The callback is run in MeshLink's own thread. * It is therefore important that the callback return quickly and uses apprioriate methods (queues, pipes, locking, etc.) @@ -1210,18 +1226,34 @@ typedef void (*meshlink_channel_receive_cb_t)(struct meshlink_handle *mesh, stru */ typedef void (*meshlink_channel_poll_cb_t)(struct meshlink_handle *mesh, struct meshlink_channel *channel, size_t len); +/// Set the listen callback. +/** This functions sets the callback that is called whenever another node wants to open a channel to the local node. + * The callback is run in MeshLink's own thread. + * It is therefore important that the callback uses apprioriate methods (queues, pipes, locking, etc.) + * to hand the data over to the application's thread. + * The callback should also not block itself and return as quickly as possible. + * + * If no listen or accept callbacks are set, incoming channels are rejected. + * + * \memberof meshlink_handle + * @param mesh A handle which represents an instance of MeshLink. + * @param cb A pointer to the function which will be called when another node want to open a channel. + * If a NULL pointer is given, the callback will be disabled. + */ +void meshlink_set_channel_listen_cb(struct meshlink_handle *mesh, meshlink_channel_listen_cb_t cb); + /// Set the accept callback. -/** This functions sets the callback that is called whenever another node sends data to the local node. +/** This functions sets the callback that is called whenever a remote node has opened a channel to the local node. * The callback is run in MeshLink's own thread. * It is therefore important that the callback uses apprioriate methods (queues, pipes, locking, etc.) * to hand the data over to the application's thread. * The callback should also not block itself and return as quickly as possible. * - * If no accept callback is set, incoming channels are rejected. + * If no listen or accept callbacks are set, incoming channels are rejected. * * \memberof meshlink_handle * @param mesh A handle which represents an instance of MeshLink. - * @param cb A pointer to the function which will be called when another node sends data to the local node. + * @param cb A pointer to the function which will be called when a new channel has been opened by a remote node. * If a NULL pointer is given, the callback will be disabled. */ void meshlink_set_channel_accept_cb(struct meshlink_handle *mesh, meshlink_channel_accept_cb_t cb); diff --git a/src/meshlink.sym b/src/meshlink.sym index 00f8fa38..ba79ce3f 100644 --- a/src/meshlink.sym +++ b/src/meshlink.sym @@ -68,6 +68,7 @@ meshlink_reset_timers meshlink_send meshlink_set_canonical_address meshlink_set_channel_accept_cb +meshlink_set_channel_listen_cb meshlink_set_channel_poll_cb meshlink_set_channel_rcvbuf meshlink_set_channel_receive_cb diff --git a/src/meshlink_internal.h b/src/meshlink_internal.h index 588c630a..ca2df2c5 100644 --- a/src/meshlink_internal.h +++ b/src/meshlink_internal.h @@ -137,6 +137,7 @@ struct meshlink_handle { meshlink_node_status_cb_t node_status_cb; meshlink_node_status_cb_t meta_status_cb; meshlink_node_pmtu_cb_t node_pmtu_cb; + meshlink_channel_listen_cb_t channel_listen_cb; meshlink_channel_accept_cb_t channel_accept_cb; meshlink_node_duplicate_cb_t node_duplicate_cb; meshlink_connection_try_cb_t connection_try_cb; diff --git a/src/utcp.c b/src/utcp.c index 7ac96c62..65e6be7a 100644 --- a/src/utcp.c +++ b/src/utcp.c @@ -1293,7 +1293,7 @@ ssize_t utcp_recv(struct utcp *utcp, const void *data, size_t len) { if(hdr.ctl & SYN && !(hdr.ctl & ACK) && utcp->accept) { // If we don't want to accept it, send a RST back - if((utcp->pre_accept && !utcp->pre_accept(utcp, hdr.dst))) { + if((utcp->listen && !utcp->listen(utcp, hdr.dst))) { len = 1; goto reset; } @@ -2158,7 +2158,7 @@ bool utcp_is_active(struct utcp *utcp) { return false; } -struct utcp *utcp_init(utcp_accept_t accept, utcp_pre_accept_t pre_accept, utcp_send_t send, void *priv) { +struct utcp *utcp_init(utcp_accept_t accept, utcp_listen_t listen, utcp_send_t send, void *priv) { if(!send) { errno = EFAULT; return NULL; @@ -2184,7 +2184,7 @@ struct utcp *utcp_init(utcp_accept_t accept, utcp_pre_accept_t pre_accept, utcp_ } utcp->accept = accept; - utcp->pre_accept = pre_accept; + utcp->listen = listen; utcp->send = send; utcp->priv = priv; utcp->timeout = DEFAULT_USER_TIMEOUT; // sec @@ -2401,10 +2401,10 @@ void utcp_set_poll_cb(struct utcp_connection *c, utcp_poll_t poll) { } } -void utcp_set_accept_cb(struct utcp *utcp, utcp_accept_t accept, utcp_pre_accept_t pre_accept) { +void utcp_set_accept_cb(struct utcp *utcp, utcp_accept_t accept, utcp_listen_t listen) { if(utcp) { utcp->accept = accept; - utcp->pre_accept = pre_accept; + utcp->listen = listen; } } diff --git a/src/utcp.h b/src/utcp.h index c051ff1f..2c537014 100644 --- a/src/utcp.h +++ b/src/utcp.h @@ -54,7 +54,7 @@ struct utcp_connection; #define UTCP_TCP 3 #define UTCP_UDP 0 -typedef bool (*utcp_pre_accept_t)(struct utcp *utcp, uint16_t port); +typedef bool (*utcp_listen_t)(struct utcp *utcp, uint16_t port); typedef void (*utcp_accept_t)(struct utcp_connection *utcp_connection, uint16_t port); typedef void (*utcp_retransmit_t)(struct utcp_connection *connection); @@ -63,7 +63,7 @@ typedef ssize_t (*utcp_recv_t)(struct utcp_connection *connection, const void *d typedef void (*utcp_poll_t)(struct utcp_connection *connection, size_t len); -struct utcp *utcp_init(utcp_accept_t accept, utcp_pre_accept_t pre_accept, utcp_send_t send, void *priv); +struct utcp *utcp_init(utcp_accept_t accept, utcp_listen_t listen, utcp_send_t send, void *priv); void utcp_exit(struct utcp *utcp); struct utcp_connection *utcp_connect_ex(struct utcp *utcp, uint16_t port, utcp_recv_t recv, void *priv, uint32_t flags); @@ -77,7 +77,7 @@ int utcp_shutdown(struct utcp_connection *connection, int how); struct timespec utcp_timeout(struct utcp *utcp); void utcp_set_recv_cb(struct utcp_connection *connection, utcp_recv_t recv); void utcp_set_poll_cb(struct utcp_connection *connection, utcp_poll_t poll); -void utcp_set_accept_cb(struct utcp *utcp, utcp_accept_t accept, utcp_pre_accept_t pre_accept); +void utcp_set_accept_cb(struct utcp *utcp, utcp_accept_t accept, utcp_listen_t listen); bool utcp_is_active(struct utcp *utcp); void utcp_abort_all_connections(struct utcp *utcp); diff --git a/src/utcp_priv.h b/src/utcp_priv.h index 197fd268..c9519458 100644 --- a/src/utcp_priv.h +++ b/src/utcp_priv.h @@ -177,7 +177,7 @@ struct utcp { // Callbacks utcp_accept_t accept; - utcp_pre_accept_t pre_accept; + utcp_listen_t listen; utcp_retransmit_t retransmit; utcp_send_t send; diff --git a/test/channels-aio-fd.c b/test/channels-aio-fd.c index 2a32c38a..677944b8 100644 --- a/test/channels-aio-fd.c +++ b/test/channels-aio-fd.c @@ -41,16 +41,6 @@ static void aio_fd_cb(meshlink_handle_t *mesh, meshlink_channel_t *channel, int set_sync_flag(&info->flag, true); } -static bool reject_cb(meshlink_handle_t *mesh, meshlink_channel_t *channel, uint16_t port, const void *data, size_t len) { - (void)mesh; - (void)channel; - (void)port; - (void)data; - (void)len; - - return false; -} - static bool accept_cb(meshlink_handle_t *mesh, meshlink_channel_t *channel, uint16_t port, const void *data, size_t len) { assert(port && port <= nchannels); assert(!data); @@ -115,7 +105,6 @@ int main(void) { // Set the callbacks. - meshlink_set_channel_accept_cb(mesh_a, reject_cb); meshlink_set_channel_accept_cb(mesh_b, accept_cb); // Start both instances diff --git a/test/channels-aio.c b/test/channels-aio.c index c43f1da1..25bcea1e 100644 --- a/test/channels-aio.c +++ b/test/channels-aio.c @@ -45,16 +45,6 @@ static void aio_cb(meshlink_handle_t *mesh, meshlink_channel_t *channel, const v set_sync_flag(&info->flag, true); } -static bool reject_cb(meshlink_handle_t *mesh, meshlink_channel_t *channel, uint16_t port, const void *data, size_t len) { - (void)mesh; - (void)channel; - (void)port; - (void)data; - (void)len; - - return false; -} - static void receive_cb(meshlink_handle_t *mesh, meshlink_channel_t *channel, const void *data, size_t len) { (void)mesh; (void)channel; @@ -128,7 +118,6 @@ int main(void) { mesh_b->priv = in_infos; - meshlink_set_channel_accept_cb(mesh_a, reject_cb); meshlink_set_channel_accept_cb(mesh_b, accept_cb); // Start both instances diff --git a/test/channels-cornercases.c b/test/channels-cornercases.c index b8534030..ca0d9b67 100644 --- a/test/channels-cornercases.c +++ b/test/channels-cornercases.c @@ -40,16 +40,6 @@ static void b_receive_cb(meshlink_handle_t *mesh, meshlink_channel_t *channel, c meshlink_channel_close(mesh, channel); } -static bool reject_cb(meshlink_handle_t *mesh, meshlink_channel_t *channel, uint16_t port, const void *data, size_t len) { - (void)mesh; - (void)channel; - (void)port; - (void)data; - (void)len; - - return false; -} - static bool accept_cb(meshlink_handle_t *mesh, meshlink_channel_t *channel, uint16_t port, const void *data, size_t len) { (void)port; @@ -87,7 +77,6 @@ int main(void) { // Set the callbacks. - meshlink_set_channel_accept_cb(a, reject_cb); meshlink_set_channel_accept_cb(b, accept_cb); // Open a channel from a to b before starting the mesh. @@ -116,7 +105,6 @@ int main(void) { channel_opened.flag = false; open_meshlink_pair(&a, &b, "channels-cornercases"); - meshlink_set_channel_accept_cb(a, reject_cb); meshlink_set_channel_accept_cb(b, accept_cb); start_meshlink_pair(a, b); diff --git a/test/channels-failure.c b/test/channels-failure.c index defdca96..1ac50d31 100644 --- a/test/channels-failure.c +++ b/test/channels-failure.c @@ -12,6 +12,13 @@ #include "../src/meshlink.h" #include "utils.h" +static bool listen_cb(meshlink_handle_t *mesh, meshlink_node_t *node, uint16_t port) { + (void)mesh; + (void)node; + + return port == 7; +} + static bool accept_cb(meshlink_handle_t *mesh, meshlink_channel_t *channel, uint16_t port, const void *data, size_t len) { (void)mesh; (void)channel; @@ -56,6 +63,7 @@ int main(void) { // Set the callbacks. + meshlink_set_channel_listen_cb(mesh_b, listen_cb); meshlink_set_channel_accept_cb(mesh_b, accept_cb); // Open a channel from a to b @@ -94,8 +102,8 @@ int main(void) { // Try setting up a new channel while b is still down. - set_sync_flag(&poll_flag, false); - set_sync_flag(&receive_flag, false); + reset_sync_flag(&poll_flag); + reset_sync_flag(&receive_flag); channel = meshlink_channel_open(mesh_a, b, 7, NULL, NULL, 0); assert(channel); @@ -107,15 +115,26 @@ int main(void) { meshlink_channel_close(mesh_a, channel); - // Restart b and create a new channel + // Restart b and create a new channel to the wrong port - set_sync_flag(&poll_flag, false); - set_sync_flag(&receive_flag, false); + reset_sync_flag(&poll_flag); + reset_sync_flag(&receive_flag); meshlink_set_node_channel_timeout(mesh_a, b, 60); assert(meshlink_start(mesh_b)); + channel = meshlink_channel_open(mesh_a, b, 42, receive_cb, NULL, 0); + meshlink_set_channel_poll_cb(mesh_a, channel, poll_cb); + assert(channel); + assert(wait_sync_flag(&poll_flag, 10)); + assert(poll_len == 0); + meshlink_channel_close(mesh_a, channel); + + // Create a channel that will be accepted + + reset_sync_flag(&poll_flag); + channel = meshlink_channel_open(mesh_a, b, 7, receive_cb, NULL, 0); meshlink_set_channel_poll_cb(mesh_a, channel, poll_cb); assert(channel); diff --git a/test/channels-fork.c b/test/channels-fork.c index 38ae9b19..74aa84b7 100644 --- a/test/channels-fork.c +++ b/test/channels-fork.c @@ -51,16 +51,6 @@ static void bar_receive_cb(meshlink_handle_t *mesh, meshlink_channel_t *channel, } } -static bool reject_cb(meshlink_handle_t *mesh, meshlink_channel_t *channel, uint16_t port, const void *data, size_t len) { - (void)mesh; - (void)channel; - (void)port; - (void)data; - (void)len; - - return false; -} - static bool accept_cb(meshlink_handle_t *mesh, meshlink_channel_t *channel, uint16_t port, const void *data, size_t len) { if(port != 7) { return false; @@ -113,8 +103,6 @@ static int main1(int rfd, int wfd) { assert(meshlink_import(mesh, indata)); - meshlink_set_channel_accept_cb(mesh, reject_cb); - assert(meshlink_start(mesh)); // Open a channel from foo to bar. diff --git a/test/channels-udp-cornercases.c b/test/channels-udp-cornercases.c index d70a6cd5..ac373be6 100644 --- a/test/channels-udp-cornercases.c +++ b/test/channels-udp-cornercases.c @@ -40,16 +40,6 @@ static void b_receive_cb(meshlink_handle_t *mesh, meshlink_channel_t *channel, c meshlink_channel_close(mesh, channel); } -static bool reject_cb(meshlink_handle_t *mesh, meshlink_channel_t *channel, uint16_t port, const void *data, size_t len) { - (void)mesh; - (void)channel; - (void)port; - (void)data; - (void)len; - - return false; -} - static bool accept_cb(meshlink_handle_t *mesh, meshlink_channel_t *channel, uint16_t port, const void *data, size_t len) { (void)port; @@ -89,7 +79,6 @@ int main(void) { // Set the callbacks. - meshlink_set_channel_accept_cb(a, reject_cb); meshlink_set_channel_accept_cb(b, accept_cb); // Open a channel from a to b before starting the mesh. @@ -122,7 +111,6 @@ int main(void) { reset_sync_flag(&b_closed); open_meshlink_pair(&a, &b, "channels-udp-cornercases"); - meshlink_set_channel_accept_cb(a, reject_cb); meshlink_set_channel_accept_cb(b, accept_cb); start_meshlink_pair(a, b); diff --git a/test/channels.c b/test/channels.c index fb8cdb0a..bc26ee59 100644 --- a/test/channels.c +++ b/test/channels.c @@ -35,16 +35,6 @@ static void b_receive_cb(meshlink_handle_t *mesh, meshlink_channel_t *channel, c assert(meshlink_channel_send(mesh, channel, data, len) == (ssize_t)len); } -static bool reject_cb(meshlink_handle_t *mesh, meshlink_channel_t *channel, uint16_t port, const void *data, size_t len) { - (void)mesh; - (void)channel; - (void)port; - (void)data; - (void)len; - - return false; -} - static bool accept_cb(meshlink_handle_t *mesh, meshlink_channel_t *channel, uint16_t port, const void *data, size_t len) { printf("accept_cb: (from %s on port %u) ", channel->node->name, (unsigned int)port); @@ -87,7 +77,6 @@ int main(void) { // Set the callbacks. - meshlink_set_channel_accept_cb(mesh_a, reject_cb); meshlink_set_channel_accept_cb(mesh_b, accept_cb); // Start both instances diff --git a/test/echo-fork.c b/test/echo-fork.c index 7fe0ab84..33966da1 100644 --- a/test/echo-fork.c +++ b/test/echo-fork.c @@ -50,16 +50,6 @@ static void b_receive_cb(meshlink_handle_t *mesh, meshlink_channel_t *channel, c assert(write(1, data, len) == (ssize_t)len); } -static bool reject_cb(meshlink_handle_t *mesh, meshlink_channel_t *channel, uint16_t port, const void *data, size_t len) { - (void)mesh; - (void)channel; - (void)port; - (void)data; - (void)len; - - return false; -} - static bool accept_cb(meshlink_handle_t *mesh, meshlink_channel_t *channel, uint16_t port, const void *data, size_t len) { if(port != 7) { return false; @@ -89,8 +79,6 @@ static int main1(void) { meshlink_handle_t *mesh = meshlink_open("echo-fork_conf.1", "a", "echo-fork", DEV_CLASS_BACKBONE); assert(mesh); - meshlink_set_channel_accept_cb(mesh, reject_cb); - assert(meshlink_start(mesh)); // Open a channel. -- 2.39.5