]> git.meshlink.io Git - meshlink/commitdiff
Handle multiple AIO buffers at once in one callback.
authorGuus Sliepen <guus@meshlink.io>
Sun, 3 May 2020 20:35:58 +0000 (22:35 +0200)
committerGuus Sliepen <guus@meshlink.io>
Sun, 3 May 2020 20:44:42 +0000 (22:44 +0200)
This also cancels an AIO buffer when there is an error reading from or
writing to a filedescriptor.

src/meshlink.c

index 5072696b9c36207ac96b8137028f98c1caad0549..db958afa037888fd4248a37f2bb84b8af543e6c5 100644 (file)
@@ -3513,12 +3513,21 @@ static ssize_t channel_recv(struct utcp_connection *connection, const void *data
                } else {
                        ssize_t result = write(aio->fd, p, todo);
 
-                       if(result > 0) {
-                               todo = result;
+                       if(result <= 0) {
+                               /* Writing to fd failed, cancel just this AIO buffer. */
+                               logger(mesh, MESHLINK_ERROR, "Writing to AIO fd %d failed: %s", aio->fd, strerror(errno));
+                               channel->aio_receive = aio->next;
+                               aio_signal(mesh, channel, aio);
+                               free(aio);
+                               continue;
                        }
+
+                       todo = result;
                }
 
                aio->done += todo;
+               p += todo;
+               left -= todo;
 
                if(aio->done == aio->len) {
                        channel->aio_receive = aio->next;
@@ -3526,10 +3535,7 @@ static ssize_t channel_recv(struct utcp_connection *connection, const void *data
                        free(aio);
                }
 
-               p += todo;
-               left -= todo;
-
-               if(!left && len) {
+               if(!left) {
                        return len;
                }
        }
@@ -3616,68 +3622,79 @@ static void channel_poll(struct utcp_connection *connection, size_t len) {
 
        node_t *n = channel->node;
        meshlink_handle_t *mesh = n->mesh;
-       meshlink_aio_buffer_t *aio = channel->aio_send;
 
-       if(aio) {
-               /* We at least one AIO buffer. Send as much as possible form the first buffer. */
-               size_t left = aio->len - aio->done;
+       while(channel->aio_send) {
                if(!len) {
                        /* This poll callback signalled an error, abort all outstanding AIO buffers. */
                        aio_abort(mesh, channel, &channel->aio_send);
-                       if(channel->poll_cb) {
-                               channel->poll_cb(mesh, channel, 0);
-                       } else {
-                               utcp_set_poll_cb(connection, NULL);
-                       }
-                       return;
+                       break;
                }
 
+               /* We have at least one AIO buffer. Send as much as possible from the buffers. */
+               meshlink_aio_buffer_t *aio = channel->aio_send;
+               size_t todo = aio->len - aio->done;
                ssize_t sent;
 
-               if(len > left) {
-                       len = left;
+               if(todo > len) {
+                       todo = len;
                }
 
                if(aio->data) {
-                       sent = utcp_send(connection, (char *)aio->data + aio->done, len);
+                       sent = utcp_send(connection, (char *)aio->data + aio->done, todo);
                } else {
-                       char buf[65536];
-                       size_t todo = utcp_get_sndbuf_free(connection);
-
-                       if(todo > left) {
-                               todo = left;
-                       }
-
-                       if(todo > sizeof(buf)) {
-                               todo = sizeof(buf);
-                       }
-
+                       char buf[todo];
                        ssize_t result = read(aio->fd, buf, todo);
 
                        if(result > 0) {
-                               sent = utcp_send(connection, buf, result);
+                               todo = result;
+                               sent = utcp_send(connection, buf, todo);
                        } else {
-                               sent = result;
+                               /* Reading from fd failed, cancel just this AIO buffer. */
+                               if(result != 0) {
+                                       logger(mesh, MESHLINK_ERROR, "Reading from AIO fd %d failed: %s", aio->fd, strerror(errno));
+                               }
+
+                               channel->aio_send = aio->next;
+                               aio_signal(mesh, channel, aio);
+                               free(aio);
+                               aio = channel->aio_send;
+                               continue;
                        }
                }
 
-               if(sent >= 0) {
-                       aio->done += sent;
+               if(sent != (ssize_t)todo) {
+                       /* We should never get a partial send at this point */
+                       assert(sent < 0);
+
+                       /* Sending failed, abort all outstanding AIO buffers and send a poll callback. */
+                       aio_abort(mesh, channel, &channel->aio_send);
+                       len = 0;
+                       break;
                }
 
-               /* If the buffer is now completely sent, call the callback and dispose of it. */
-               if(aio->done >= aio->len) {
-                       channel->aio_send = aio->next;
-                       aio_signal(mesh, channel, aio);
-                       free(aio);
+               aio->done += sent;
+               len -= sent;
+
+               /* If we didn't finish this buffer, exit early. */
+               if(aio->done < aio->len) {
+                       return;
                }
-       } else {
-               if(channel->poll_cb) {
-                       channel->poll_cb(mesh, channel, len);
-               } else {
-                       utcp_set_poll_cb(connection, NULL);
+
+               /* Signal completion of this buffer, and go to the next one. */
+               channel->aio_send = aio->next;
+               aio_signal(mesh, channel, aio);
+               free(aio);
+
+               if(!len) {
+                       return;
                }
        }
+
+       if(channel->poll_cb) {
+               channel->poll_cb(mesh, channel, len);
+       } else {
+               utcp_set_poll_cb(connection, NULL);
+       }
 }
 
 void meshlink_set_channel_poll_cb(meshlink_handle_t *mesh, meshlink_channel_t *channel, meshlink_channel_poll_cb_t cb) {