From 0e03a8ad0de862ea431110d27d4659c082855f98 Mon Sep 17 00:00:00 2001 From: Guus Sliepen Date: Sun, 20 Jun 2021 22:14:18 +0200 Subject: [PATCH] Remove support for TCP channels and AIO. --- examples/channels.c | 37 +-- src/meshlink-tiny++.h | 113 --------- src/meshlink-tiny.h | 105 --------- src/meshlink.c | 389 +----------------------------- src/meshlink.sym | 6 - src/meshlink_internal.h | 17 -- src/utcp.c | 509 ++-------------------------------------- src/utcp.h | 19 +- src/utcp_priv.h | 10 - 9 files changed, 32 insertions(+), 1173 deletions(-) diff --git a/examples/channels.c b/examples/channels.c index e067ba0..d3df2d9 100644 --- a/examples/channels.c +++ b/examples/channels.c @@ -61,13 +61,6 @@ static bool channel_accept(meshlink_handle_t *mesh, meshlink_channel_t *channel, return true; } -static void channel_poll(meshlink_handle_t *mesh, meshlink_channel_t *channel, size_t len) { - (void)len; - - fprintf(stderr, "Channel to '%s' connected\n", channel->node->name); - meshlink_set_channel_poll_cb(mesh, channel, NULL); -} - static void node_status(meshlink_handle_t *mesh, meshlink_node_t *node, bool reachable) { (void)mesh; @@ -78,9 +71,6 @@ static void node_status(meshlink_handle_t *mesh, meshlink_node_t *node, bool rea } } -static meshlink_node_t **nodes; -static size_t nnodes; - static void parse_command(meshlink_handle_t *mesh, char *buf) { char *arg = strchr(buf, ' '); @@ -106,30 +96,6 @@ static void parse_command(meshlink_handle_t *mesh, char *buf) { fprintf(stderr, "Could not restart MeshLink: %s\n", meshlink_strerror(meshlink_errno)); exit(1); } - } else if(!strcasecmp(buf, "who")) { - if(!arg) { - nodes = meshlink_get_all_nodes(mesh, nodes, &nnodes); - - if(!nnodes) { - fprintf(stderr, "Could not get list of nodes: %s\n", meshlink_strerror(meshlink_errno)); - } else { - printf("%zu known nodes:", nnodes); - - for(size_t i = 0; i < nnodes; i++) { - printf(" %s", nodes[i]->name); - } - - printf("\n"); - } - } else { - meshlink_node_t *node = meshlink_get_node(mesh, arg); - - if(!node) { - fprintf(stderr, "Error looking up '%s': %s\n", arg, meshlink_strerror(meshlink_errno)); - } else { - printf("Node %s found\n", arg); - } - } } else if(!strcasecmp(buf, "quit")) { printf("Bye!\n"); fclose(stdin); @@ -212,7 +178,7 @@ static void parse_input(meshlink_handle_t *mesh, char *buf) { if(!channel) { fprintf(stderr, "Opening chat channel to '%s'\n", destination->name); - channel = meshlink_channel_open(mesh, destination, CHAT_PORT, channel_receive, NULL, 0); + channel = meshlink_channel_open_ex(mesh, destination, CHAT_PORT, channel_receive, NULL, 0, MESHLINK_CHANNEL_UDP); if(!channel) { fprintf(stderr, "Could not create channel to '%s': %s\n", destination->name, meshlink_strerror(meshlink_errno)); @@ -220,7 +186,6 @@ static void parse_input(meshlink_handle_t *mesh, char *buf) { } destination->priv = channel; - meshlink_set_channel_poll_cb(mesh, channel, channel_poll); } if(!meshlink_channel_send(mesh, channel, msg, strlen(msg))) { diff --git a/src/meshlink-tiny++.h b/src/meshlink-tiny++.h index 238931e..d916877 100644 --- a/src/meshlink-tiny++.h +++ b/src/meshlink-tiny++.h @@ -106,29 +106,6 @@ typedef void (*channel_receive_cb_t)(mesh *mesh, channel *channel, const void *d */ typedef void (*channel_poll_cb_t)(mesh *mesh, channel *channel, size_t len); -/// A callback for cleaning up buffers submitted for asynchronous I/O. -/** This callbacks signals that MeshLink has finished using this buffer. - * The ownership of the buffer is now back into the application's hands. - * - * @param mesh A handle which represents an instance of MeshLink. - * @param channel A handle for the channel which used this buffer. - * @param data A pointer to a buffer containing the enqueued data. - * @param len The length of the buffer. - * @param priv A private pointer which was set by the application when submitting the buffer. - */ -typedef void (*aio_cb_t)(mesh *mesh, channel *channel, const void *data, size_t len, void *priv); - -/// A callback for asynchronous I/O to and from filedescriptors. -/** This callbacks signals that MeshLink has finished using this filedescriptor. - * - * @param mesh A handle which represents an instance of MeshLink. - * @param channel A handle for the channel which used this filedescriptor. - * @param fd The filedescriptor that was used. - * @param len The length of the data that was successfully sent or received. - * @param priv A private pointer which was set by the application when submitting the buffer. - */ -typedef void (*aio_fd_cb_t)(mesh *mesh, channel *channel, int fd, size_t len, void *priv); - /// A class describing a MeshLink node. class node: public meshlink_node_t { }; @@ -584,21 +561,6 @@ public: return meshlink_forget_node(handle, node); } - /// Set the poll callback. - /** This functions sets the callback that is called whenever data can be sent to another node. - * The callback is run in MeshLink's own thread. - * It is therefore important that the callback uses apprioriate methods (queues, pipes, locking, etc.) - * to pass data to or from the application's thread. - * The callback should also not block itself and return as quickly as possible. - * - * @param channel A handle for the channel. - * @param cb A pointer to the function which will be called when data can be sent to another node. - * If a NULL pointer is given, the callback will be disabled. - */ - void set_channel_poll_cb(channel *channel, channel_poll_cb_t cb) { - meshlink_set_channel_poll_cb(handle, channel, (meshlink_channel_poll_cb_t)cb); - } - /// Set the send buffer size of a channel. /** This function sets the desired size of the send buffer. * The default size is 128 kB. @@ -691,7 +653,6 @@ public: */ channel *channel_open(node *node, uint16_t port, channel_receive_cb_t cb, const void *data, size_t len, uint32_t flags = channel::TCP) { channel *ch = (channel *)meshlink_channel_open_ex(handle, node, port, (meshlink_channel_receive_cb_t)cb, data, len, flags); - meshlink_set_channel_poll_cb(handle, ch, &channel_poll_trampoline); return ch; } @@ -716,7 +677,6 @@ public: */ channel *channel_open(node *node, uint16_t port, const void *data, size_t len, uint32_t flags = channel::TCP) { channel *ch = (channel *)meshlink_channel_open_ex(handle, node, port, &channel_receive_trampoline, data, len, flags); - meshlink_set_channel_poll_cb(handle, ch, &channel_poll_trampoline); return ch; } @@ -776,78 +736,6 @@ public: return meshlink_channel_send(handle, channel, data, len); } - /// Transmit data on a channel asynchronously - /** This registers a buffer that will be used to send data to the remote node. - * Multiple buffers can be registered, in which case data will be sent in the order the buffers were registered. - * While there are still buffers with unsent data, the poll callback will not be called. - * - * @param channel A handle for the channel. - * @param data A pointer to a buffer containing data sent by the source, or NULL if there is no data to send. - * After meshlink_channel_aio_send() returns, the buffer may not be modified or freed by the application - * until the callback routine is called. - * @param len The length of the data, or 0 if there is no data to send. - * @param cb A pointer to the function which will be called when MeshLink has finished using the buffer. - * @param priv A private pointer which was set by the application when submitting the buffer. - * - * @return True if the buffer was enqueued, false otherwise. - */ - bool channel_aio_send(channel *channel, const void *data, size_t len, meshlink_aio_cb_t cb, void *priv) { - return meshlink_channel_aio_send(handle, channel, data, len, cb, priv); - } - - /// Transmit data on a channel asynchronously from a filedescriptor - /** This will read up to the specified length number of bytes from the given filedescriptor, and send it over the channel. - * The callback may be returned early if there is an error reading from the filedescriptor. - * While there is still with unsent data, the poll callback will not be called. - * - * @param channel A handle for the channel. - * @param fd A file descriptor from which data will be read. - * @param len The length of the data, or 0 if there is no data to send. - * @param cb A pointer to the function which will be called when MeshLink has finished using the filedescriptor. - * @param priv A private pointer which was set by the application when submitting the buffer. - * - * @return True if the buffer was enqueued, false otherwise. - */ - bool channel_aio_fd_send(channel *channel, int fd, size_t len, meshlink_aio_fd_cb_t cb, void *priv) { - return meshlink_channel_aio_fd_send(handle, channel, fd, len, cb, priv); - } - - /// Receive data on a channel asynchronously - /** This registers a buffer that will be filled with incoming channel data. - * Multiple buffers can be registered, in which case data will be received in the order the buffers were registered. - * While there are still buffers that have not been filled, the receive callback will not be called. - * - * @param channel A handle for the channel. - * @param data A pointer to a buffer that will be filled with incoming data. - * After meshlink_channel_aio_receive() returns, the buffer may not be modified or freed by the application - * until the callback routine is called. - * @param len The length of the data. - * @param cb A pointer to the function which will be called when MeshLink has finished using the buffer. - * @param priv A private pointer which was set by the application when submitting the buffer. - * - * @return True if the buffer was enqueued, false otherwise. - */ - bool channel_aio_receive(channel *channel, const void *data, size_t len, meshlink_aio_cb_t cb, void *priv) { - return meshlink_channel_aio_receive(handle, channel, data, len, cb, priv); - } - - /// Receive data on a channel asynchronously and send it to a filedescriptor - /** This will read up to the specified length number of bytes from the channel, and send it to the filedescriptor. - * The callback may be returned early if there is an error writing to the filedescriptor. - * While there is still unread data, the receive callback will not be called. - * - * @param channel A handle for the channel. - * @param fd A file descriptor to which data will be written. - * @param len The length of the data. - * @param cb A pointer to the function which will be called when MeshLink has finished using the filedescriptor. - * @param priv A private pointer which was set by the application when submitting the buffer. - * - * @return True if the buffer was enqueued, false otherwise. - */ - bool channel_aio_fd_receive(channel *channel, int fd, size_t len, meshlink_aio_fd_cb_t cb, void *priv) { - return meshlink_channel_aio_fd_receive(handle, channel, fd, len, cb, priv); - } - /// Get the amount of bytes in the send buffer. /** This returns the amount of bytes in the send buffer. * These bytes have not been received by the peer yet. @@ -1015,7 +903,6 @@ private: if(accepted) { meshlink_set_channel_receive_cb(handle, channel, &channel_receive_trampoline); - meshlink_set_channel_poll_cb(handle, channel, &channel_poll_trampoline); } return accepted; diff --git a/src/meshlink-tiny.h b/src/meshlink-tiny.h index 10fec47..ad32802 100644 --- a/src/meshlink-tiny.h +++ b/src/meshlink-tiny.h @@ -892,16 +892,6 @@ typedef bool (*meshlink_channel_accept_cb_t)(struct meshlink_handle *mesh, struc */ typedef void (*meshlink_channel_receive_cb_t)(struct meshlink_handle *mesh, struct meshlink_channel *channel, const void *data, size_t len); -/// A callback informing the application when data can be sent on a channel. -/** This function is called whenever there is enough free buffer space so a call to meshlink_channel_send() will succeed. - * - * @param mesh A handle which represents an instance of MeshLink. - * @param channel A handle for the channel. - * @param len The maximum amount of data that is guaranteed to be accepted by meshlink_channel_send(), - * or 0 in case of an error. - */ -typedef void (*meshlink_channel_poll_cb_t)(struct meshlink_handle *mesh, struct meshlink_channel *channel, size_t len); - /// Set the listen callback. /** This functions sets the callback that is called whenever another node wants to open a channel to the local node. * The callback is run in MeshLink's own thread. @@ -1136,101 +1126,6 @@ void meshlink_channel_abort(struct meshlink_handle *mesh, struct meshlink_channe */ ssize_t meshlink_channel_send(struct meshlink_handle *mesh, struct meshlink_channel *channel, const void *data, size_t len) __attribute__((__warn_unused_result__)); -/// A callback for cleaning up buffers submitted for asynchronous I/O. -/** This callbacks signals that MeshLink has finished using this buffer. - * The ownership of the buffer is now back into the application's hands. - * - * @param mesh A handle which represents an instance of MeshLink. - * @param channel A handle for the channel which used this buffer. - * @param data A pointer to a buffer containing the enqueued data. - * @param len The length of the buffer. - * @param priv A private pointer which was set by the application when submitting the buffer. - */ -typedef void (*meshlink_aio_cb_t)(struct meshlink_handle *mesh, struct meshlink_channel *channel, const void *data, size_t len, void *priv); - -/// A callback for asynchronous I/O to and from filedescriptors. -/** This callbacks signals that MeshLink has finished using this filedescriptor. - * - * @param mesh A handle which represents an instance of MeshLink. - * @param channel A handle for the channel which used this filedescriptor. - * @param fd The filedescriptor that was used. - * @param len The length of the data that was successfully sent or received. - * @param priv A private pointer which was set by the application when submitting the buffer. - */ -typedef void (*meshlink_aio_fd_cb_t)(struct meshlink_handle *mesh, struct meshlink_channel *channel, int fd, size_t len, void *priv); - -/// Transmit data on a channel asynchronously -/** This registers a buffer that will be used to send data to the remote node. - * Multiple buffers can be registered, in which case data will be sent in the order the buffers were registered. - * While there are still buffers with unsent data, the poll callback will not be called. - * - * \memberof meshlink_channel - * @param mesh A handle which represents an instance of MeshLink. - * @param channel A handle for the channel. - * @param data A pointer to a buffer containing data sent by the source, or NULL if there is no data to send. - * After meshlink_channel_aio_send() returns, the buffer may not be modified or freed by the application - * until the callback routine is called. - * @param len The length of the data, or 0 if there is no data to send. - * @param cb A pointer to the function which will be called when MeshLink has finished using the buffer. - * @param priv A private pointer which is passed unchanged to the callback. - * - * @return True if the buffer was enqueued, false otherwise. - */ -bool meshlink_channel_aio_send(struct meshlink_handle *mesh, struct meshlink_channel *channel, const void *data, size_t len, meshlink_aio_cb_t cb, void *priv) __attribute__((__warn_unused_result__)); - -/// Transmit data on a channel asynchronously from a filedescriptor -/** This will read up to the specified length number of bytes from the given filedescriptor, and send it over the channel. - * The callback may be returned early if there is an error reading from the filedescriptor. - * While there is still with unsent data, the poll callback will not be called. - * - * \memberof meshlink_channel - * @param mesh A handle which represents an instance of MeshLink. - * @param channel A handle for the channel. - * @param fd A file descriptor from which data will be read. - * @param len The length of the data, or 0 if there is no data to send. - * @param cb A pointer to the function which will be called when MeshLink has finished using the filedescriptor. - * @param priv A private pointer which is passed unchanged to the callback. - * - * @return True if the buffer was enqueued, false otherwise. - */ -bool meshlink_channel_aio_fd_send(struct meshlink_handle *mesh, struct meshlink_channel *channel, int fd, size_t len, meshlink_aio_fd_cb_t cb, void *priv) __attribute__((__warn_unused_result__)); - -/// Receive data on a channel asynchronously -/** This registers a buffer that will be filled with incoming channel data. - * Multiple buffers can be registered, in which case data will be received in the order the buffers were registered. - * While there are still buffers that have not been filled, the receive callback will not be called. - * - * \memberof meshlink_channel - * @param mesh A handle which represents an instance of MeshLink. - * @param channel A handle for the channel. - * @param data A pointer to a buffer that will be filled with incoming data. - * After meshlink_channel_aio_receive() returns, the buffer may not be modified or freed by the application - * until the callback routine is called. - * @param len The length of the data. - * @param cb A pointer to the function which will be called when MeshLink has finished using the buffer. - * @param priv A private pointer which is passed unchanged to the callback. - * - * @return True if the buffer was enqueued, false otherwise. - */ -bool meshlink_channel_aio_receive(struct meshlink_handle *mesh, struct meshlink_channel *channel, const void *data, size_t len, meshlink_aio_cb_t cb, void *priv) __attribute__((__warn_unused_result__)); - -/// Receive data on a channel asynchronously and send it to a filedescriptor -/** This will read up to the specified length number of bytes from the channel, and send it to the filedescriptor. - * The callback may be returned early if there is an error writing to the filedescriptor. - * While there is still unread data, the receive callback will not be called. - * - * \memberof meshlink_channel - * @param mesh A handle which represents an instance of MeshLink. - * @param channel A handle for the channel. - * @param fd A file descriptor to which data will be written. - * @param len The length of the data. - * @param cb A pointer to the function which will be called when MeshLink has finished using the filedescriptor. - * @param priv A private pointer which was set by the application when submitting the buffer. - * - * @return True if the buffer was enqueued, false otherwise. - */ -bool meshlink_channel_aio_fd_receive(struct meshlink_handle *mesh, struct meshlink_channel *channel, int fd, size_t len, meshlink_aio_fd_cb_t cb, void *priv) __attribute__((__warn_unused_result__)); - /// Get channel flags. /** This returns the flags used when opening this channel. * diff --git a/src/meshlink.c b/src/meshlink.c index 89a146e..64580cf 100644 --- a/src/meshlink.c +++ b/src/meshlink.c @@ -2258,48 +2258,6 @@ static bool channel_pre_accept(struct utcp *utcp, uint16_t port) { } } -/* Finish one AIO buffer, return true if the channel is still open. */ -static bool aio_finish_one(meshlink_handle_t *mesh, meshlink_channel_t *channel, meshlink_aio_buffer_t **head) { - meshlink_aio_buffer_t *aio = *head; - *head = aio->next; - - if(channel->c) { - channel->in_callback = true; - - if(aio->data) { - if(aio->cb.buffer) { - aio->cb.buffer(mesh, channel, aio->data, aio->done, aio->priv); - } - } else { - if(aio->cb.fd) { - aio->cb.fd(mesh, channel, aio->fd, aio->done, aio->priv); - } - } - - channel->in_callback = false; - - if(!channel->c) { - free(aio); - free(channel); - return false; - } - } - - free(aio); - return true; -} - -/* Finish all AIO buffers, return true if the channel is still open. */ -static bool aio_abort(meshlink_handle_t *mesh, meshlink_channel_t *channel, meshlink_aio_buffer_t **head) { - while(*head) { - if(!aio_finish_one(mesh, channel, head)) { - return false; - } - } - - return true; -} - static ssize_t channel_recv(struct utcp_connection *connection, const void *data, size_t len) { meshlink_channel_t *channel = connection->priv; @@ -2318,61 +2276,6 @@ static ssize_t channel_recv(struct utcp_connection *connection, const void *data const char *p = data; size_t left = len; - while(channel->aio_receive) { - if(!len) { - /* This receive callback signalled an error, abort all outstanding AIO buffers. */ - if(!aio_abort(mesh, channel, &channel->aio_receive)) { - return len; - } - - break; - } - - meshlink_aio_buffer_t *aio = channel->aio_receive; - size_t todo = aio->len - aio->done; - - if(todo > left) { - todo = left; - } - - if(aio->data) { - memcpy((char *)aio->data + aio->done, p, todo); - } else { - ssize_t result = write(aio->fd, p, todo); - - if(result <= 0) { - if(result < 0 && errno == EINTR) { - continue; - } - - /* Writing to fd failed, cancel just this AIO buffer. */ - logger(mesh, MESHLINK_ERROR, "Writing to AIO fd %d failed: %s", aio->fd, strerror(errno)); - - if(!aio_finish_one(mesh, channel, &channel->aio_receive)) { - return len; - } - - continue; - } - - todo = result; - } - - aio->done += todo; - p += todo; - left -= todo; - - if(aio->done == aio->len) { - if(!aio_finish_one(mesh, channel, &channel->aio_receive)) { - return len; - } - } - - if(!left) { - return len; - } - } - if(channel->receive_cb) { channel->receive_cb(mesh, channel, p, left); } @@ -2437,119 +2340,6 @@ static void channel_receive(meshlink_handle_t *mesh, meshlink_node_t *source, co utcp_recv(n->utcp, data, len); } -static void channel_poll(struct utcp_connection *connection, size_t len) { - meshlink_channel_t *channel = connection->priv; - - if(!channel) { - abort(); - } - - node_t *n = channel->node; - meshlink_handle_t *mesh = n->mesh; - - while(channel->aio_send) { - if(!len) { - /* This poll callback signalled an error, abort all outstanding AIO buffers. */ - if(!aio_abort(mesh, channel, &channel->aio_send)) { - return; - } - - break; - } - - /* We have at least one AIO buffer. Send as much as possible from the buffers. */ - meshlink_aio_buffer_t *aio = channel->aio_send; - size_t todo = aio->len - aio->done; - ssize_t sent; - - if(todo > len) { - todo = len; - } - - if(aio->data) { - sent = utcp_send(connection, (char *)aio->data + aio->done, todo); - } else { - /* Limit the amount we read at once to avoid stack overflows */ - if(todo > 65536) { - todo = 65536; - } - - char buf[todo]; - ssize_t result = read(aio->fd, buf, todo); - - if(result > 0) { - todo = result; - sent = utcp_send(connection, buf, todo); - } else { - if(result < 0 && errno == EINTR) { - continue; - } - - /* Reading from fd failed, cancel just this AIO buffer. */ - if(result != 0) { - logger(mesh, MESHLINK_ERROR, "Reading from AIO fd %d failed: %s", aio->fd, strerror(errno)); - } - - if(!aio_finish_one(mesh, channel, &channel->aio_send)) { - return; - } - - continue; - } - } - - if(sent != (ssize_t)todo) { - /* Sending failed, abort all outstanding AIO buffers and send a poll callback. */ - if(!aio_abort(mesh, channel, &channel->aio_send)) { - return; - } - - len = 0; - break; - } - - aio->done += sent; - len -= sent; - - /* If we didn't finish this buffer, exit early. */ - if(aio->done < aio->len) { - return; - } - - /* Signal completion of this buffer, and go to the next one. */ - if(!aio_finish_one(mesh, channel, &channel->aio_send)) { - return; - } - - if(!len) { - return; - } - } - - if(channel->poll_cb) { - channel->poll_cb(mesh, channel, len); - } else { - utcp_set_poll_cb(connection, NULL); - } -} - -void meshlink_set_channel_poll_cb(meshlink_handle_t *mesh, meshlink_channel_t *channel, meshlink_channel_poll_cb_t cb) { - logger(mesh, MESHLINK_DEBUG, "meshlink_set_channel_poll_cb(%p, %p)", (void *)channel, (void *)(intptr_t)cb); - - if(!mesh || !channel) { - meshlink_errno = MESHLINK_EINVAL; - return; - } - - if(pthread_mutex_lock(&mesh->mutex) != 0) { - abort(); - } - - channel->poll_cb = cb; - utcp_set_poll_cb(channel->c, (cb || channel->aio_send) ? channel_poll : NULL); - pthread_mutex_unlock(&mesh->mutex); -} - void meshlink_set_channel_listen_cb(meshlink_handle_t *mesh, meshlink_channel_listen_cb_t cb) { logger(mesh, MESHLINK_DEBUG, "meshlink_set_channel_listen_cb(%p)", (void *)(intptr_t)cb); @@ -2743,10 +2533,6 @@ void meshlink_channel_close(meshlink_handle_t *mesh, meshlink_channel_t *channel if(channel->c) { utcp_close(channel->c); channel->c = NULL; - - /* Clean up any outstanding AIO buffers. */ - aio_abort(mesh, channel, &channel->aio_send); - aio_abort(mesh, channel, &channel->aio_receive); } if(!channel->in_callback) { @@ -2771,10 +2557,6 @@ void meshlink_channel_abort(meshlink_handle_t *mesh, meshlink_channel_t *channel if(channel->c) { utcp_abort(channel->c); channel->c = NULL; - - /* Clean up any outstanding AIO buffers. */ - aio_abort(mesh, channel, &channel->aio_send); - aio_abort(mesh, channel, &channel->aio_receive); } if(!channel->in_callback) { @@ -2812,12 +2594,7 @@ ssize_t meshlink_channel_send(meshlink_handle_t *mesh, meshlink_channel_t *chann abort(); } - /* Disallow direct calls to utcp_send() while we still have AIO active. */ - if(channel->aio_send) { - retval = 0; - } else { - retval = utcp_send(channel->c, data, len); - } + retval = utcp_send(channel->c, data, len); pthread_mutex_unlock(&mesh->mutex); @@ -2828,170 +2605,6 @@ ssize_t meshlink_channel_send(meshlink_handle_t *mesh, meshlink_channel_t *chann return retval; } -bool meshlink_channel_aio_send(meshlink_handle_t *mesh, meshlink_channel_t *channel, const void *data, size_t len, meshlink_aio_cb_t cb, void *priv) { - logger(mesh, MESHLINK_DEBUG, "meshlink_channel_aio_send(%p, %p, %zu, %p, %p)", (void *)channel, data, len, (void *)(intptr_t)cb, priv); - - if(!mesh || !channel) { - meshlink_errno = MESHLINK_EINVAL; - return false; - } - - if(!len || !data) { - meshlink_errno = MESHLINK_EINVAL; - return false; - } - - meshlink_aio_buffer_t *aio = xzalloc(sizeof(*aio)); - aio->data = data; - aio->len = len; - aio->cb.buffer = cb; - aio->priv = priv; - - if(pthread_mutex_lock(&mesh->mutex) != 0) { - abort(); - } - - /* Append the AIO buffer descriptor to the end of the chain */ - meshlink_aio_buffer_t **p = &channel->aio_send; - - while(*p) { - p = &(*p)->next; - } - - *p = aio; - - /* Ensure the poll callback is set, and call it right now to push data if possible */ - utcp_set_poll_cb(channel->c, channel_poll); - size_t todo = MIN(len, utcp_get_rcvbuf_free(channel->c)); - - if(todo) { - channel_poll(channel->c, todo); - } - - pthread_mutex_unlock(&mesh->mutex); - - return true; -} - -bool meshlink_channel_aio_fd_send(meshlink_handle_t *mesh, meshlink_channel_t *channel, int fd, size_t len, meshlink_aio_fd_cb_t cb, void *priv) { - logger(mesh, MESHLINK_DEBUG, "meshlink_channel_aio_fd_send(%p, %d, %zu, %p, %p)", (void *)channel, fd, len, (void *)(intptr_t)cb, priv); - - if(!mesh || !channel) { - meshlink_errno = MESHLINK_EINVAL; - return false; - } - - if(!len || fd == -1) { - meshlink_errno = MESHLINK_EINVAL; - return false; - } - - meshlink_aio_buffer_t *aio = xzalloc(sizeof(*aio)); - aio->fd = fd; - aio->len = len; - aio->cb.fd = cb; - aio->priv = priv; - - if(pthread_mutex_lock(&mesh->mutex) != 0) { - abort(); - } - - /* Append the AIO buffer descriptor to the end of the chain */ - meshlink_aio_buffer_t **p = &channel->aio_send; - - while(*p) { - p = &(*p)->next; - } - - *p = aio; - - /* Ensure the poll callback is set, and call it right now to push data if possible */ - utcp_set_poll_cb(channel->c, channel_poll); - size_t left = utcp_get_rcvbuf_free(channel->c); - - if(left) { - channel_poll(channel->c, left); - } - - pthread_mutex_unlock(&mesh->mutex); - - return true; -} - -bool meshlink_channel_aio_receive(meshlink_handle_t *mesh, meshlink_channel_t *channel, const void *data, size_t len, meshlink_aio_cb_t cb, void *priv) { - logger(mesh, MESHLINK_DEBUG, "meshlink_channel_aio_receive(%p, %p, %zu, %p, %p)", (void *)channel, data, len, (void *)(intptr_t)cb, priv); - - if(!mesh || !channel) { - meshlink_errno = MESHLINK_EINVAL; - return false; - } - - if(!len || !data) { - meshlink_errno = MESHLINK_EINVAL; - return false; - } - - meshlink_aio_buffer_t *aio = xzalloc(sizeof(*aio)); - aio->data = data; - aio->len = len; - aio->cb.buffer = cb; - aio->priv = priv; - - if(pthread_mutex_lock(&mesh->mutex) != 0) { - abort(); - } - - /* Append the AIO buffer descriptor to the end of the chain */ - meshlink_aio_buffer_t **p = &channel->aio_receive; - - while(*p) { - p = &(*p)->next; - } - - *p = aio; - - pthread_mutex_unlock(&mesh->mutex); - - return true; -} - -bool meshlink_channel_aio_fd_receive(meshlink_handle_t *mesh, meshlink_channel_t *channel, int fd, size_t len, meshlink_aio_fd_cb_t cb, void *priv) { - logger(mesh, MESHLINK_DEBUG, "meshlink_channel_aio_fd_receive(%p, %d, %zu, %p, %p)", (void *)channel, fd, len, (void *)(intptr_t)cb, priv); - - if(!mesh || !channel) { - meshlink_errno = MESHLINK_EINVAL; - return false; - } - - if(!len || fd == -1) { - meshlink_errno = MESHLINK_EINVAL; - return false; - } - - meshlink_aio_buffer_t *aio = xzalloc(sizeof(*aio)); - aio->fd = fd; - aio->len = len; - aio->cb.fd = cb; - aio->priv = priv; - - if(pthread_mutex_lock(&mesh->mutex) != 0) { - abort(); - } - - /* Append the AIO buffer descriptor to the end of the chain */ - meshlink_aio_buffer_t **p = &channel->aio_receive; - - while(*p) { - p = &(*p)->next; - } - - *p = aio; - - pthread_mutex_unlock(&mesh->mutex); - - return true; -} - uint32_t meshlink_channel_get_flags(meshlink_handle_t *mesh, meshlink_channel_t *channel) { if(!mesh || !channel) { meshlink_errno = MESHLINK_EINVAL; diff --git a/src/meshlink.sym b/src/meshlink.sym index 1389360..6806917 100644 --- a/src/meshlink.sym +++ b/src/meshlink.sym @@ -6,16 +6,11 @@ devtool_set_meta_status_cb devtool_set_inviter_commits_first devtool_trybind_probe meshlink_channel_abort -meshlink_channel_aio_fd_receive -meshlink_channel_aio_fd_send -meshlink_channel_aio_receive -meshlink_channel_aio_send meshlink_channel_close meshlink_channel_get_flags meshlink_channel_get_mss meshlink_channel_get_recvq meshlink_channel_get_sendq -meshlink_channel_open meshlink_channel_open_ex meshlink_channel_send meshlink_channel_shutdown @@ -56,7 +51,6 @@ meshlink_set_canonical_address meshlink_set_channel_accept_cb meshlink_set_channel_flags meshlink_set_channel_listen_cb -meshlink_set_channel_poll_cb meshlink_set_channel_rcvbuf meshlink_set_channel_rcvbuf_storage meshlink_set_channel_receive_cb diff --git a/src/meshlink_internal.h b/src/meshlink_internal.h index c92af1b..56c810f 100644 --- a/src/meshlink_internal.h +++ b/src/meshlink_internal.h @@ -150,20 +150,6 @@ struct meshlink_node { void *priv; }; -/// An AIO buffer. -typedef struct meshlink_aio_buffer { - const void *data; - int fd; - size_t len; - size_t done; - union { - meshlink_aio_cb_t buffer; - meshlink_aio_fd_cb_t fd; - } cb; - void *priv; - struct meshlink_aio_buffer *next; -} meshlink_aio_buffer_t; - /// A channel. struct meshlink_channel { struct node_t *node; @@ -171,10 +157,7 @@ struct meshlink_channel { bool in_callback; struct utcp_connection *c; - meshlink_aio_buffer_t *aio_send; - meshlink_aio_buffer_t *aio_receive; meshlink_channel_receive_cb_t receive_cb; - meshlink_channel_poll_cb_t poll_cb; }; /// Header for data packets routed between nodes diff --git a/src/utcp.c b/src/utcp.c index ca91bee..dc30fe6 100644 --- a/src/utcp.c +++ b/src/utcp.c @@ -182,10 +182,6 @@ static bool fin_wanted(struct utcp_connection *c, uint32_t seq) { } } -static bool is_reliable(struct utcp_connection *c) { - return c->flags & UTCP_RELIABLE; -} - static int32_t seqdiff(uint32_t a, uint32_t b) { return a - b; } @@ -322,53 +318,6 @@ static ssize_t buffer_copy(struct buffer *buf, void *data, size_t offset, size_t return len; } -// Copy data from the buffer without removing it. -static ssize_t buffer_call(struct utcp_connection *c, struct buffer *buf, size_t offset, size_t len) { - if(!c->recv) { - return len; - } - - // Ensure we don't copy more than is actually stored in the buffer - if(offset >= buf->used) { - return 0; - } - - if(buf->used - offset < len) { - len = buf->used - offset; - } - - uint32_t realoffset = buf->offset + offset; - - if(buf->size - buf->offset <= offset) { - // The offset wrapped - realoffset -= buf->size; - } - - if(buf->size - realoffset < len) { - // The data is wrapped - ssize_t rx1 = c->recv(c, buf->data + realoffset, buf->size - realoffset); - - if(rx1 < buf->size - realoffset) { - return rx1; - } - - // The channel might have been closed by the previous callback - if(!c->recv) { - return len; - } - - ssize_t rx2 = c->recv(c, buf->data, len - (buf->size - realoffset)); - - if(rx2 < 0) { - return rx2; - } else { - return rx1 + rx2; - } - } else { - return c->recv(c, buf->data + realoffset, len); - } -} - // Discard data from the buffer. static ssize_t buffer_discard(struct buffer *buf, size_t len) { if(buf->used < len) { @@ -702,7 +651,7 @@ struct utcp_connection *utcp_connect_ex(struct utcp *utcp, uint16_t dst, utcp_re return NULL; } - assert((flags & ~0x1f) == 0); + assert(flags == 0); // UDP only c->flags = flags; c->recv = recv; @@ -738,10 +687,6 @@ struct utcp_connection *utcp_connect_ex(struct utcp *utcp, uint16_t dst, utcp_re return c; } -struct utcp_connection *utcp_connect(struct utcp *utcp, uint16_t dst, utcp_recv_t recv, void *priv) { - return utcp_connect_ex(utcp, dst, recv, priv, UTCP_TCP); -} - void utcp_accept(struct utcp_connection *c, utcp_recv_t recv, void *priv) { if(c->reapable || c->state != SYN_RECEIVED) { debug(c, "accept() called on invalid connection in state %s\n", c, strstate[c->state]); @@ -751,13 +696,12 @@ void utcp_accept(struct utcp_connection *c, utcp_recv_t recv, void *priv) { debug(c, "accepted %p %p\n", c, recv, priv); c->recv = recv; c->priv = priv; - c->do_poll = true; set_state(c, ESTABLISHED); } static void ack(struct utcp_connection *c, bool sendatleastone) { int32_t left = seqdiff(c->snd.last, c->snd.nxt); - int32_t cwndleft = is_reliable(c) ? min(c->snd.cwnd, c->snd.wnd) - seqdiff(c->snd.nxt, c->snd.una) : MAX_UNRELIABLE_SIZE; + int32_t cwndleft = MAX_UNRELIABLE_SIZE; assert(left >= 0); @@ -785,7 +729,7 @@ static void ack(struct utcp_connection *c, bool sendatleastone) { pkt->hdr.src = c->src; pkt->hdr.dst = c->dst; pkt->hdr.ack = c->rcv.nxt; - pkt->hdr.wnd = is_reliable(c) ? c->rcvbuf.maxsize : 0; + pkt->hdr.wnd = 0; pkt->hdr.ctl = ACK; pkt->hdr.aux = 0; @@ -798,14 +742,6 @@ static void ack(struct utcp_connection *c, bool sendatleastone) { c->snd.nxt += seglen; left -= seglen; - if(!is_reliable(c)) { - if(left) { - pkt->hdr.ctl |= MF; - } else { - pkt->hdr.ctl &= ~MF; - } - } - if(seglen && fin_wanted(c, c->snd.nxt)) { seglen--; pkt->hdr.ctl |= FIN; @@ -821,7 +757,7 @@ static void ack(struct utcp_connection *c, bool sendatleastone) { print_packet(c, "send", pkt, sizeof(pkt->hdr) + seglen); c->utcp->send(c->utcp, pkt, sizeof(pkt->hdr) + seglen); - if(left && !is_reliable(c)) { + if(left) { pkt->hdr.wnd += seglen; } } while(left); @@ -868,25 +804,9 @@ ssize_t utcp_send(struct utcp_connection *c, const void *data, size_t len) { return -1; } - // Check if we need to be able to buffer all data - - if(c->flags & UTCP_NO_PARTIAL) { - if(len > buffer_free(&c->sndbuf)) { - if(len > c->sndbuf.maxsize) { - errno = EMSGSIZE; - return -1; - } else { - errno = EWOULDBLOCK; - return 0; - } - } - } - // Add data to send buffer. - if(is_reliable(c)) { - len = buffer_put(&c->sndbuf, data, len); - } else if(c->state != SYN_SENT && c->state != SYN_RECEIVED) { + if(c->state != SYN_SENT && c->state != SYN_RECEIVED) { if(len > MAX_UNRELIABLE_SIZE || buffer_put(&c->sndbuf, data, len) != (ssize_t)len) { errno = EMSGSIZE; return -1; @@ -896,12 +816,7 @@ ssize_t utcp_send(struct utcp_connection *c, const void *data, size_t len) { } if(len <= 0) { - if(is_reliable(c)) { - errno = EWOULDBLOCK; - return 0; - } else { - return len; - } + return len; } c->snd.last += len; @@ -914,19 +829,8 @@ ssize_t utcp_send(struct utcp_connection *c, const void *data, size_t len) { ack(c, false); - if(!is_reliable(c)) { - c->snd.una = c->snd.nxt = c->snd.last; - buffer_discard(&c->sndbuf, c->sndbuf.used); - } - - if(is_reliable(c) && !timespec_isset(&c->rtrx_timeout)) { - start_retransmit_timer(c); - } - - if(is_reliable(c) && !timespec_isset(&c->conn_timeout)) { - clock_gettime(UTCP_CLOCK, &c->conn_timeout); - c->conn_timeout.tv_sec += c->utcp->timeout; - } + c->snd.una = c->snd.nxt = c->snd.last; + buffer_discard(&c->sndbuf, c->sndbuf.used); return len; } @@ -937,51 +841,6 @@ static void swap_ports(struct hdr *hdr) { hdr->dst = tmp; } -static void fast_retransmit(struct utcp_connection *c) { - if(c->state == CLOSED || c->snd.last == c->snd.una) { - debug(c, "fast_retransmit() called but nothing to retransmit!\n"); - return; - } - - struct utcp *utcp = c->utcp; - - struct { - struct hdr hdr; - uint8_t data[]; - } *pkt = c->utcp->pkt; - - pkt->hdr.src = c->src; - pkt->hdr.dst = c->dst; - pkt->hdr.wnd = c->rcvbuf.maxsize; - pkt->hdr.aux = 0; - - switch(c->state) { - case ESTABLISHED: - case FIN_WAIT_1: - case CLOSE_WAIT: - case CLOSING: - case LAST_ACK: - // Send unacked data again. - pkt->hdr.seq = c->snd.una; - pkt->hdr.ack = c->rcv.nxt; - pkt->hdr.ctl = ACK; - uint32_t len = min(seqdiff(c->snd.last, c->snd.una), utcp->mss); - - if(fin_wanted(c, c->snd.una + len)) { - len--; - pkt->hdr.ctl |= FIN; - } - - buffer_copy(&c->sndbuf, pkt->data, 0, len); - print_packet(c, "rtrx", pkt, sizeof(pkt->hdr) + len); - utcp->send(utcp, pkt, sizeof(pkt->hdr) + len); - break; - - default: - break; - } -} - static void retransmit(struct utcp_connection *c) { if(c->state == CLOSED || c->snd.last == c->snd.una) { debug(c, "retransmit() called but nothing to retransmit!\n"); @@ -991,10 +850,6 @@ static void retransmit(struct utcp_connection *c) { struct utcp *utcp = c->utcp; - if(utcp->retransmit) { - utcp->retransmit(c); - } - struct { struct hdr hdr; uint8_t data[]; @@ -1030,6 +885,8 @@ static void retransmit(struct utcp_connection *c) { break; case ESTABLISHED: + break; + case FIN_WAIT_1: case CLOSE_WAIT: case CLOSING: @@ -1043,19 +900,14 @@ static void retransmit(struct utcp_connection *c) { if(fin_wanted(c, c->snd.una + len)) { len--; pkt->hdr.ctl |= FIN; + } else { + break; } - // RFC 5681 slow start after timeout - uint32_t flightsize = seqdiff(c->snd.nxt, c->snd.una); - c->snd.ssthresh = max(flightsize / 2, utcp->mss * 2); // eq. 4 - c->snd.cwnd = utcp->mss; - debug_cwnd(c); + assert(len == 0); - buffer_copy(&c->sndbuf, pkt->data, 0, len); print_packet(c, "rtrx", pkt, sizeof(pkt->hdr) + len); utcp->send(utcp, pkt, sizeof(pkt->hdr) + len); - - c->snd.nxt = c->snd.una + len; break; case CLOSED: @@ -1084,149 +936,6 @@ cleanup: return; } -/* Update receive buffer and SACK entries after consuming data. - * - * Situation: - * - * |.....0000..1111111111.....22222......3333| - * |---------------^ - * - * 0..3 represent the SACK entries. The ^ indicates up to which point we want - * to remove data from the receive buffer. The idea is to substract "len" - * from the offset of all the SACK entries, and then remove/cut down entries - * that are shifted to before the start of the receive buffer. - * - * There are three cases: - * - the SACK entry is after ^, in that case just change the offset. - * - the SACK entry starts before and ends after ^, so we have to - * change both its offset and size. - * - the SACK entry is completely before ^, in that case delete it. - */ -static void sack_consume(struct utcp_connection *c, size_t len) { - debug(c, "sack_consume %lu\n", (unsigned long)len); - - if(len > c->rcvbuf.used) { - debug(c, "all SACK entries consumed\n"); - c->sacks[0].len = 0; - return; - } - - buffer_discard(&c->rcvbuf, len); - - for(int i = 0; i < NSACKS && c->sacks[i].len;) { - if(len < c->sacks[i].offset) { - c->sacks[i].offset -= len; - i++; - } else if(len < c->sacks[i].offset + c->sacks[i].len) { - c->sacks[i].len -= len - c->sacks[i].offset; - c->sacks[i].offset = 0; - i++; - } else { - if(i < NSACKS - 1) { - memmove(&c->sacks[i], &c->sacks[i + 1], (NSACKS - 1 - i) * sizeof(c->sacks)[i]); - c->sacks[NSACKS - 1].len = 0; - } else { - c->sacks[i].len = 0; - break; - } - } - } - - for(int i = 0; i < NSACKS && c->sacks[i].len; i++) { - debug(c, "SACK[%d] offset %u len %u\n", i, c->sacks[i].offset, c->sacks[i].len); - } -} - -static void handle_out_of_order(struct utcp_connection *c, uint32_t offset, const void *data, size_t len) { - debug(c, "out of order packet, offset %u\n", offset); - // Packet loss or reordering occured. Store the data in the buffer. - ssize_t rxd = buffer_put_at(&c->rcvbuf, offset, data, len); - - if(rxd <= 0) { - debug(c, "packet outside receive buffer, dropping\n"); - return; - } - - if((size_t)rxd < len) { - debug(c, "packet partially outside receive buffer\n"); - len = rxd; - } - - // Make note of where we put it. - for(int i = 0; i < NSACKS; i++) { - if(!c->sacks[i].len) { // nothing to merge, add new entry - debug(c, "new SACK entry %d\n", i); - c->sacks[i].offset = offset; - c->sacks[i].len = rxd; - break; - } else if(offset < c->sacks[i].offset) { - if(offset + rxd < c->sacks[i].offset) { // insert before - if(!c->sacks[NSACKS - 1].len) { // only if room left - debug(c, "insert SACK entry at %d\n", i); - memmove(&c->sacks[i + 1], &c->sacks[i], (NSACKS - i - 1) * sizeof(c->sacks)[i]); - c->sacks[i].offset = offset; - c->sacks[i].len = rxd; - } else { - debug(c, "SACK entries full, dropping packet\n"); - } - - break; - } else { // merge - debug(c, "merge with start of SACK entry at %d\n", i); - c->sacks[i].offset = offset; - break; - } - } else if(offset <= c->sacks[i].offset + c->sacks[i].len) { - if(offset + rxd > c->sacks[i].offset + c->sacks[i].len) { // merge - debug(c, "merge with end of SACK entry at %d\n", i); - c->sacks[i].len = offset + rxd - c->sacks[i].offset; - // TODO: handle potential merge with next entry - } - - break; - } - } - - for(int i = 0; i < NSACKS && c->sacks[i].len; i++) { - debug(c, "SACK[%d] offset %u len %u\n", i, c->sacks[i].offset, c->sacks[i].len); - } -} - -static void handle_in_order(struct utcp_connection *c, const void *data, size_t len) { - if(c->recv) { - ssize_t rxd = c->recv(c, data, len); - - if(rxd != (ssize_t)len) { - // TODO: handle the application not accepting all data. - abort(); - } - } - - // Check if we can process out-of-order data now. - if(c->sacks[0].len && len >= c->sacks[0].offset) { - debug(c, "incoming packet len %lu connected with SACK at %u\n", (unsigned long)len, c->sacks[0].offset); - - if(len < c->sacks[0].offset + c->sacks[0].len) { - size_t offset = len; - len = c->sacks[0].offset + c->sacks[0].len; - size_t remainder = len - offset; - - ssize_t rxd = buffer_call(c, &c->rcvbuf, offset, remainder); - - if(rxd != (ssize_t)remainder) { - // TODO: handle the application not accepting all data. - abort(); - } - } - } - - if(c->rcvbuf.used) { - sack_consume(c, len); - } - - c->rcv.nxt += len; -} - static void handle_unreliable(struct utcp_connection *c, const struct hdr *hdr, const void *data, size_t len) { // Fast path for unfragmented packets if(!hdr->wnd && !(hdr->ctl & MF)) { @@ -1237,52 +946,12 @@ static void handle_unreliable(struct utcp_connection *c, const struct hdr *hdr, c->rcv.nxt = hdr->seq + len; return; } - - // Ensure reassembled packet are not larger than 64 kiB - if(hdr->wnd >= MAX_UNRELIABLE_SIZE || hdr->wnd + len > MAX_UNRELIABLE_SIZE) { - return; - } - - // Don't accept out of order fragments - if(hdr->wnd && hdr->seq != c->rcv.nxt) { - return; - } - - // Reset the receive buffer for the first fragment - if(!hdr->wnd) { - buffer_clear(&c->rcvbuf); - } - - ssize_t rxd = buffer_put_at(&c->rcvbuf, hdr->wnd, data, len); - - if(rxd != (ssize_t)len) { - return; - } - - // Send the packet if it's the final fragment - if(!(hdr->ctl & MF)) { - buffer_call(c, &c->rcvbuf, 0, hdr->wnd + len); - } - - c->rcv.nxt = hdr->seq + len; } static void handle_incoming_data(struct utcp_connection *c, const struct hdr *hdr, const void *data, size_t len) { - if(!is_reliable(c)) { - handle_unreliable(c, hdr, data, len); - return; - } - - uint32_t offset = seqdiff(hdr->seq, c->rcv.nxt); - - if(offset) { - handle_out_of_order(c, offset, data, len); - } else { - handle_in_order(c, data, len); - } + handle_unreliable(c, hdr, data, len); } - ssize_t utcp_recv(struct utcp *utcp, const void *data, size_t len) { const uint8_t *ptr = data; @@ -1416,7 +1085,7 @@ ssize_t utcp_recv(struct utcp *utcp, const void *data, size_t len) { c->flags = init[3] & 0x7; } else { - c->flags = UTCP_TCP; + c->flags = UTCP_UDP; } synack: @@ -1497,53 +1166,14 @@ synack: break; } - // 1b. Discard data that is not in our receive window. - - if(is_reliable(c)) { - bool acceptable; - - if(c->state == SYN_SENT) { - acceptable = true; - } else if(len == 0) { - acceptable = seqdiff(hdr.seq, c->rcv.nxt) >= 0; - } else { - int32_t rcv_offset = seqdiff(hdr.seq, c->rcv.nxt); - - // cut already accepted front overlapping - if(rcv_offset < 0) { - acceptable = len > (size_t) - rcv_offset; - - if(acceptable) { - ptr -= rcv_offset; - len += rcv_offset; - hdr.seq -= rcv_offset; - } - } else { - acceptable = seqdiff(hdr.seq, c->rcv.nxt) >= 0 && seqdiff(hdr.seq, c->rcv.nxt) + len <= c->rcvbuf.maxsize; - } - } - - if(!acceptable) { - debug(c, "packet not acceptable, %u <= %u + %lu < %u\n", c->rcv.nxt, hdr.seq, (unsigned long)len, c->rcv.nxt + c->rcvbuf.maxsize); - - // Ignore unacceptable RST packets. - if(hdr.ctl & RST) { - return 0; - } - - // Otherwise, continue processing. - len = 0; - } - } else { #if UTCP_DEBUG - int32_t rcv_offset = seqdiff(hdr.seq, c->rcv.nxt); + int32_t rcv_offset = seqdiff(hdr.seq, c->rcv.nxt); - if(rcv_offset) { - debug(c, "packet out of order, offset %u bytes", rcv_offset); - } + if(rcv_offset) { + debug(c, "packet out of order, offset %u bytes", rcv_offset); + } #endif - } c->snd.wnd = hdr.wnd; // TODO: move below @@ -1551,10 +1181,8 @@ synack: // ackno should not roll back, and it should also not be bigger than what we ever could have sent // (= snd.una + c->sndbuf.used). - if(!is_reliable(c)) { - if(hdr.ack != c->snd.last && c->state >= ESTABLISHED) { - hdr.ack = c->snd.una; - } + if(hdr.ack != c->snd.last && c->state >= ESTABLISHED) { + hdr.ack = c->snd.una; } // 2. Handle RST packets @@ -1574,10 +1202,6 @@ synack: c->recv(c, NULL, 0); } - if(c->poll && !c->reapable) { - c->poll(c, 0); - } - return 0; case SYN_RECEIVED: @@ -1607,10 +1231,6 @@ synack: c->recv(c, NULL, 0); } - if(c->poll && !c->reapable) { - c->poll(c, 0); - } - return 0; case CLOSING: @@ -1692,10 +1312,6 @@ synack: if(data_acked) { buffer_discard(&c->sndbuf, data_acked); - - if(is_reliable(c)) { - c->do_poll = true; - } } // Also advance snd.nxt if possible @@ -1748,40 +1364,6 @@ synack: default: break; } - } else { - if(!len && is_reliable(c) && c->snd.una != c->snd.last) { - c->dupack++; - debug(c, "duplicate ACK %d\n", c->dupack); - - if(c->dupack == 3) { - // RFC 5681 fast recovery - debug(c, "fast recovery started\n", c->dupack); - uint32_t flightsize = seqdiff(c->snd.nxt, c->snd.una); - c->snd.ssthresh = max(flightsize / 2, utcp->mss * 2); // eq. 4 - c->snd.cwnd = min(c->snd.ssthresh + 3 * utcp->mss, c->sndbuf.maxsize); - - if(c->snd.cwnd > c->sndbuf.maxsize) { - c->snd.cwnd = c->sndbuf.maxsize; - } - - debug_cwnd(c); - - fast_retransmit(c); - } else if(c->dupack > 3) { - c->snd.cwnd += utcp->mss; - - if(c->snd.cwnd > c->sndbuf.maxsize) { - c->snd.cwnd = c->sndbuf.maxsize; - } - - debug_cwnd(c); - } - - // We got an ACK which indicates the other side did get one of our packets. - // Reset the retransmission timer to avoid going to slow start, - // but don't touch the connection timeout. - start_retransmit_timer(c); - } } // 4. Update timers @@ -1790,10 +1372,6 @@ synack: if(c->snd.una == c->snd.last) { stop_retransmit_timer(c); timespec_clear(&c->conn_timeout); - } else if(is_reliable(c)) { - start_retransmit_timer(c); - clock_gettime(UTCP_CLOCK, &c->conn_timeout); - c->conn_timeout.tv_sec += utcp->timeout; } } @@ -1816,7 +1394,6 @@ skip_ack: c->snd.last++; set_state(c, FIN_WAIT_1); } else { - c->do_poll = true; set_state(c, ESTABLISHED); } @@ -1905,7 +1482,7 @@ skip_ack: // 7. Process FIN stuff - if((hdr.ctl & FIN) && (!is_reliable(c) || hdr.seq + len == c->rcv.nxt)) { + if(hdr.ctl & FIN) { switch(c->state) { case SYN_SENT: case SYN_RECEIVED: @@ -1960,7 +1537,7 @@ skip_ack: // - or we got an ack, so we should maybe send a bit more data // -> sendatleastone = false - if(is_reliable(c) || hdr.ctl & SYN || hdr.ctl & FIN) { + if(hdr.ctl & SYN || hdr.ctl & FIN) { ack(c, has_data); } @@ -2120,7 +1697,6 @@ static void set_reapable(struct utcp_connection *c) { set_buffer_storage(&c->rcvbuf, NULL, min(c->rcvbuf.maxsize, DEFAULT_MAXRCVBUFSIZE)); c->recv = NULL; - c->poll = NULL; c->reapable = true; } @@ -2144,11 +1720,6 @@ void utcp_reset_all_connections(struct utcp *utcp) { errno = 0; c->recv(c, NULL, 0); } - - if(c->poll && !c->reapable) { - errno = 0; - c->poll(c, 0); - } } return; @@ -2211,10 +1782,6 @@ struct timespec utcp_timeout(struct utcp *utcp) { c->recv(c, NULL, 0); } - if(c->poll && !c->reapable) { - c->poll(c, 0); - } - continue; } @@ -2223,19 +1790,6 @@ struct timespec utcp_timeout(struct utcp *utcp) { retransmit(c); } - if(c->poll) { - if((c->state == ESTABLISHED || c->state == CLOSE_WAIT) && c->do_poll) { - c->do_poll = false; - uint32_t len = buffer_free(&c->sndbuf); - - if(len) { - c->poll(c, len); - } - } else if(c->state == CLOSED) { - c->poll(c, 0); - } - } - if(timespec_isset(&c->conn_timeout) && timespec_lt(&c->conn_timeout, &next)) { next = c->conn_timeout; } @@ -2314,10 +1868,6 @@ void utcp_exit(struct utcp *utcp) { if(c->recv) { c->recv(c, NULL, 0); } - - if(c->poll && !c->reapable) { - c->poll(c, 0); - } } buffer_exit(&c->rcvbuf); @@ -2434,8 +1984,6 @@ void utcp_set_sndbuf(struct utcp_connection *c, void *data, size_t size) { } set_buffer_storage(&c->sndbuf, data, size); - - c->do_poll = is_reliable(c) && buffer_free(&c->sndbuf); } size_t utcp_get_rcvbuf(struct utcp_connection *c) { @@ -2496,13 +2044,6 @@ void utcp_set_recv_cb(struct utcp_connection *c, utcp_recv_t recv) { } } -void utcp_set_poll_cb(struct utcp_connection *c, utcp_poll_t poll) { - if(c) { - c->poll = poll; - c->do_poll = is_reliable(c) && buffer_free(&c->sndbuf); - } -} - void utcp_set_accept_cb(struct utcp *utcp, utcp_accept_t accept, utcp_listen_t listen) { if(utcp) { utcp->accept = accept; @@ -2565,10 +2106,6 @@ void utcp_offline(struct utcp *utcp, bool offline) { } } -void utcp_set_retransmit_cb(struct utcp *utcp, utcp_retransmit_t cb) { - utcp->retransmit = cb; -} - void utcp_set_clock_granularity(long granularity) { CLOCK_GRANULARITY = granularity; } diff --git a/src/utcp.h b/src/utcp.h index 5de9364..1d1111c 100644 --- a/src/utcp.h +++ b/src/utcp.h @@ -45,30 +45,27 @@ struct utcp_connection; #define UTCP_SHUT_WR 1 #define UTCP_SHUT_RDWR 2 -#define UTCP_ORDERED 1 -#define UTCP_RELIABLE 2 -#define UTCP_FRAMED 4 -#define UTCP_DROP_LATE 8 -#define UTCP_NO_PARTIAL 16 +//#define UTCP_ORDERED 1 +//#define UTCP_RELIABLE 2 +//#define UTCP_FRAMED 4 +//#define UTCP_DROP_LATE 8 +//#define UTCP_NO_PARTIAL 16 -#define UTCP_TCP 3 +//#define UTCP_TCP 3 #define UTCP_UDP 0 -#define UTCP_CHANGEABLE_FLAGS 0x18U +#define UTCP_CHANGEABLE_FLAGS 0x0U typedef bool (*utcp_listen_t)(struct utcp *utcp, uint16_t port); typedef void (*utcp_accept_t)(struct utcp_connection *utcp_connection, uint16_t port); -typedef void (*utcp_retransmit_t)(struct utcp_connection *connection); typedef ssize_t (*utcp_send_t)(struct utcp *utcp, const void *data, size_t len); typedef ssize_t (*utcp_recv_t)(struct utcp_connection *connection, const void *data, size_t len); -typedef void (*utcp_poll_t)(struct utcp_connection *connection, size_t len); struct utcp *utcp_init(utcp_accept_t accept, utcp_listen_t listen, utcp_send_t send, void *priv); void utcp_exit(struct utcp *utcp); struct utcp_connection *utcp_connect_ex(struct utcp *utcp, uint16_t port, utcp_recv_t recv, void *priv, uint32_t flags); -struct utcp_connection *utcp_connect(struct utcp *utcp, uint16_t port, utcp_recv_t recv, void *priv); void utcp_accept(struct utcp_connection *utcp, utcp_recv_t recv, void *priv); ssize_t utcp_send(struct utcp_connection *connection, const void *data, size_t len); ssize_t utcp_recv(struct utcp *utcp, const void *data, size_t len); @@ -77,7 +74,6 @@ int utcp_abort(struct utcp_connection *connection); int utcp_shutdown(struct utcp_connection *connection, int how); struct timespec utcp_timeout(struct utcp *utcp); void utcp_set_recv_cb(struct utcp_connection *connection, utcp_recv_t recv); -void utcp_set_poll_cb(struct utcp_connection *connection, utcp_poll_t poll); void utcp_set_accept_cb(struct utcp *utcp, utcp_accept_t accept, utcp_listen_t listen); bool utcp_is_active(struct utcp *utcp); void utcp_reset_all_connections(struct utcp *utcp); @@ -94,7 +90,6 @@ void utcp_set_mtu(struct utcp *utcp, uint16_t mtu); void utcp_reset_timers(struct utcp *utcp); void utcp_offline(struct utcp *utcp, bool offline); -void utcp_set_retransmit_cb(struct utcp *utcp, utcp_retransmit_t retransmit); // Per-socket options diff --git a/src/utcp_priv.h b/src/utcp_priv.h index f96b92b..602140a 100644 --- a/src/utcp_priv.h +++ b/src/utcp_priv.h @@ -36,7 +36,6 @@ #define AUX_SAK 3 #define AUX_TIMESTAMP 4 -#define NSACKS 4 #define DEFAULT_SNDBUFSIZE 0 #define DEFAULT_MAXSNDBUFSIZE 131072 #define DEFAULT_RCVBUFSIZE 0 @@ -98,23 +97,16 @@ struct buffer { bool external; }; -struct sack { - uint32_t offset; - uint32_t len; -}; - struct utcp_connection { void *priv; struct utcp *utcp; uint32_t flags; bool reapable; - bool do_poll; // Callbacks utcp_recv_t recv; - utcp_poll_t poll; // TCP State @@ -158,7 +150,6 @@ struct utcp_connection { uint32_t prev_free; struct buffer sndbuf; struct buffer rcvbuf; - struct sack sacks[NSACKS]; // Per-socket options @@ -179,7 +170,6 @@ struct utcp { utcp_accept_t accept; utcp_listen_t listen; - utcp_retransmit_t retransmit; utcp_send_t send; // Packet buffer -- 2.39.2