]> git.meshlink.io Git - meshlink/commitdiff
Several fixes for channel AIO send and receive functions.
authorGuus Sliepen <guus@meshlink.io>
Tue, 28 Apr 2020 20:21:34 +0000 (22:21 +0200)
committerGuus Sliepen <guus@meshlink.io>
Tue, 28 Apr 2020 20:37:30 +0000 (22:37 +0200)
- Process multiple buffers if possible
- Better handling error conditions
  - fd errors now cancel the AIO buffer
  - channel errors cancel all outstanding AIO buffers
- Don't call the poll callback with a length larger than the remaining
  UTCP send buffer.

src/meshlink.c

index 4569c9f58fb2cdae484b3b316c1eba044daa636a..80bcc9704480afc84c2bbffe2203db85d2a93ac3 100644 (file)
@@ -3467,6 +3467,15 @@ static void aio_signal(meshlink_handle_t *mesh, meshlink_channel_t *channel, mes
        }
 }
 
+static void aio_abort(meshlink_handle_t *mesh, meshlink_channel_t *channel, meshlink_aio_buffer_t **aio) {
+       while(*aio) {
+               meshlink_aio_buffer_t *next = (*aio)->next;
+               aio_signal(mesh, channel, *aio);
+               free(*aio);
+               *aio = next;
+       }
+}
+
 static ssize_t channel_recv(struct utcp_connection *connection, const void *data, size_t len) {
        meshlink_channel_t *channel = connection->priv;
 
@@ -3486,6 +3495,12 @@ static ssize_t channel_recv(struct utcp_connection *connection, const void *data
        size_t left = len;
 
        while(channel->aio_receive) {
+               if(!len) {
+                       /* This receive callback signalled an error, abort all outstanding AIO buffers. */
+                       aio_abort(mesh, channel, &channel->aio_receive);
+                       break;
+               }
+
                meshlink_aio_buffer_t *aio = channel->aio_receive;
                size_t todo = aio->len - aio->done;
 
@@ -3498,12 +3513,21 @@ static ssize_t channel_recv(struct utcp_connection *connection, const void *data
                } else {
                        ssize_t result = write(aio->fd, p, todo);
 
-                       if(result > 0) {
-                               todo = result;
+                       if(result <= 0) {
+                               /* Writing to fd failed, cancel just this AIO buffer. */
+                               logger(mesh, MESHLINK_ERROR, "Writing to AIO fd %d failed: %s", aio->fd, strerror(errno));
+                               channel->aio_receive = aio->next;
+                               aio_signal(mesh, channel, aio);
+                               free(aio);
+                               continue;
                        }
+
+                       todo = result;
                }
 
                aio->done += todo;
+               p += todo;
+               left -= todo;
 
                if(aio->done == aio->len) {
                        channel->aio_receive = aio->next;
@@ -3511,10 +3535,7 @@ static ssize_t channel_recv(struct utcp_connection *connection, const void *data
                        free(aio);
                }
 
-               p += todo;
-               left -= todo;
-
-               if(!left && len) {
+               if(!left) {
                        return len;
                }
        }
@@ -3601,57 +3622,79 @@ static void channel_poll(struct utcp_connection *connection, size_t len) {
 
        node_t *n = channel->node;
        meshlink_handle_t *mesh = n->mesh;
-       meshlink_aio_buffer_t *aio = channel->aio_send;
 
-       if(aio) {
-               /* We at least one AIO buffer. Send as much as possible form the first buffer. */
-               size_t left = aio->len - aio->done;
+       while(channel->aio_send) {
+               if(!len) {
+                       /* This poll callback signalled an error, abort all outstanding AIO buffers. */
+                       aio_abort(mesh, channel, &channel->aio_send);
+                       break;
+               }
+
+               /* We have at least one AIO buffer. Send as much as possible from the buffers. */
+               meshlink_aio_buffer_t *aio = channel->aio_send;
+               size_t todo = aio->len - aio->done;
                ssize_t sent;
 
-               if(len > left) {
-                       len = left;
+               if(todo > len) {
+                       todo = len;
                }
 
                if(aio->data) {
-                       sent = utcp_send(connection, (char *)aio->data + aio->done, len);
+                       sent = utcp_send(connection, (char *)aio->data + aio->done, todo);
                } else {
-                       char buf[65536];
-                       size_t todo = utcp_get_sndbuf_free(connection);
-
-                       if(todo > left) {
-                               todo = left;
-                       }
-
-                       if(todo > sizeof(buf)) {
-                               todo = sizeof(buf);
-                       }
-
+                       char buf[todo];
                        ssize_t result = read(aio->fd, buf, todo);
 
                        if(result > 0) {
-                               sent = utcp_send(connection, buf, result);
+                               todo = result;
+                               sent = utcp_send(connection, buf, todo);
                        } else {
-                               sent = result;
+                               /* Reading from fd failed, cancel just this AIO buffer. */
+                               if(result != 0) {
+                                       logger(mesh, MESHLINK_ERROR, "Reading from AIO fd %d failed: %s", aio->fd, strerror(errno));
+                               }
+
+                               channel->aio_send = aio->next;
+                               aio_signal(mesh, channel, aio);
+                               free(aio);
+                               aio = channel->aio_send;
+                               continue;
                        }
                }
 
-               if(sent >= 0) {
-                       aio->done += sent;
+               if(sent != (ssize_t)todo) {
+                       /* We should never get a partial send at this point */
+                       assert(sent < 0);
+
+                       /* Sending failed, abort all outstanding AIO buffers and send a poll callback. */
+                       aio_abort(mesh, channel, &channel->aio_send);
+                       len = 0;
+                       break;
                }
 
-               /* If the buffer is now completely sent, call the callback and dispose of it. */
-               if(aio->done >= aio->len) {
-                       channel->aio_send = aio->next;
-                       aio_signal(mesh, channel, aio);
-                       free(aio);
+               aio->done += sent;
+               len -= sent;
+
+               /* If we didn't finish this buffer, exit early. */
+               if(aio->done < aio->len) {
+                       return;
                }
-       } else {
-               if(channel->poll_cb) {
-                       channel->poll_cb(mesh, channel, len);
-               } else {
-                       utcp_set_poll_cb(connection, NULL);
+
+               /* Signal completion of this buffer, and go to the next one. */
+               channel->aio_send = aio->next;
+               aio_signal(mesh, channel, aio);
+               free(aio);
+
+               if(!len) {
+                       return;
                }
        }
+
+       if(channel->poll_cb) {
+               channel->poll_cb(mesh, channel, len);
+       } else {
+               utcp_set_poll_cb(connection, NULL);
+       }
 }
 
 void meshlink_set_channel_poll_cb(meshlink_handle_t *mesh, meshlink_channel_t *channel, meshlink_channel_poll_cb_t cb) {
@@ -3794,17 +3837,8 @@ void meshlink_channel_close(meshlink_handle_t *mesh, meshlink_channel_t *channel
        utcp_close(channel->c);
 
        /* Clean up any outstanding AIO buffers. */
-       for(meshlink_aio_buffer_t *aio = channel->aio_send, *next; aio; aio = next) {
-               next = aio->next;
-               aio_signal(mesh, channel, aio);
-               free(aio);
-       }
-
-       for(meshlink_aio_buffer_t *aio = channel->aio_receive, *next; aio; aio = next) {
-               next = aio->next;
-               aio_signal(mesh, channel, aio);
-               free(aio);
-       }
+       aio_abort(mesh, channel, &channel->aio_send);
+       aio_abort(mesh, channel, &channel->aio_receive);
 
        pthread_mutex_unlock(&mesh->mutex);
 
@@ -3881,7 +3915,11 @@ bool meshlink_channel_aio_send(meshlink_handle_t *mesh, meshlink_channel_t *chan
 
        /* Ensure the poll callback is set, and call it right now to push data if possible */
        utcp_set_poll_cb(channel->c, channel_poll);
-       channel_poll(channel->c, len);
+       size_t todo = MIN(len, utcp_get_rcvbuf_free(channel->c));
+
+       if(todo) {
+               channel_poll(channel->c, todo);
+       }
 
        pthread_mutex_unlock(&mesh->mutex);
 
@@ -3918,7 +3956,11 @@ bool meshlink_channel_aio_fd_send(meshlink_handle_t *mesh, meshlink_channel_t *c
 
        /* Ensure the poll callback is set, and call it right now to push data if possible */
        utcp_set_poll_cb(channel->c, channel_poll);
-       channel_poll(channel->c, len);
+       size_t left = utcp_get_rcvbuf_free(channel->c);
+
+       if(left) {
+               channel_poll(channel->c, left);
+       }
 
        pthread_mutex_unlock(&mesh->mutex);