]> git.meshlink.io Git - meshlink/commitdiff
Fix cornercases closing channels.
authorGuus Sliepen <guus@meshlink.io>
Wed, 30 Sep 2020 20:16:22 +0000 (22:16 +0200)
committerGuus Sliepen <guus@meshlink.io>
Wed, 30 Sep 2020 20:16:22 +0000 (22:16 +0200)
Closing a channel while there was data in the receive buffer would cause a
RST to be sent instead of a FIN. We now always send a FIN, and let data
in the receive buffer be handled for a later data handling (which would
then send a RST if necessary).

The RST could be dropped if the ACK seqno was not in the correct range.
We now always accept RSTs for established connections.

Finally, when receiving more data after closing the channel, we would just
accept the data but discard it, instead of sending a RST back. Now we do
send a RST back.

src/meshlink.c
src/utcp.c
test/channels-aio-cornercases.c
test/channels-udp-cornercases.c

index 56da2a045452acf8004b0d0a04cbf1e7e2810ceb..a8ed1fad3d20a0d39db9ad5f9c7a67763d904e6f 100644 (file)
@@ -3904,9 +3904,6 @@ static void channel_poll(struct utcp_connection *connection, size_t len) {
                }
 
                if(sent != (ssize_t)todo) {
                }
 
                if(sent != (ssize_t)todo) {
-                       /* We should never get a partial send at this point */
-                       assert(sent <= 0);
-
                        /* Sending failed, abort all outstanding AIO buffers and send a poll callback. */
                        if(!aio_abort(mesh, channel, &channel->aio_send)) {
                                return;
                        /* Sending failed, abort all outstanding AIO buffers and send a poll callback. */
                        if(!aio_abort(mesh, channel, &channel->aio_send)) {
                                return;
index c1add21808cd51031c07b5919b5aa90c4b1f84c7..20dd0aba049757ca30833779ec772c63ba685310 100644 (file)
@@ -1368,7 +1368,7 @@ synack:
 
        if(c->state == CLOSED) {
                debug(c, "got packet for closed connection\n");
 
        if(c->state == CLOSED) {
                debug(c, "got packet for closed connection\n");
-               return 0;
+               goto reset;
        }
 
        // It is for an existing connection.
        }
 
        // It is for an existing connection.
@@ -1456,17 +1456,6 @@ synack:
                }
        }
 
                }
        }
 
-       if(hdr.ctl & ACK && (seqdiff(hdr.ack, c->snd.last) > 0 || seqdiff(hdr.ack, c->snd.una) < 0)) {
-               debug(c, "packet ack seqno out of range, %u <= %u < %u\n", c->snd.una, hdr.ack, c->snd.una + c->sndbuf.used);
-
-               // Ignore unacceptable RST packets.
-               if(hdr.ctl & RST) {
-                       return 0;
-               }
-
-               goto reset;
-       }
-
        // 2. Handle RST packets
 
        if(hdr.ctl & RST) {
        // 2. Handle RST packets
 
        if(hdr.ctl & RST) {
@@ -1556,14 +1545,14 @@ synack:
 
        // 3. Advance snd.una
 
 
        // 3. Advance snd.una
 
+       if(seqdiff(hdr.ack, c->snd.last) > 0 || seqdiff(hdr.ack, c->snd.una) < 0) {
+               debug(c, "packet ack seqno out of range, %u <= %u < %u\n", c->snd.una, hdr.ack, c->snd.una + c->sndbuf.used);
+               goto reset;
+       }
+
        advanced = seqdiff(hdr.ack, c->snd.una);
 
        if(advanced) {
        advanced = seqdiff(hdr.ack, c->snd.una);
 
        if(advanced) {
-               if(c->reapable && !is_reliable(c)) {
-                       // TODO: we should also send RST for reliable connections
-                       goto reset;
-               }
-
                // RTT measurement
                if(c->rtt_start.tv_sec) {
                        if(c->rtt_seq == hdr.ack) {
                // RTT measurement
                if(c->rtt_start.tv_sec) {
                        if(c->rtt_seq == hdr.ack) {
@@ -1783,8 +1772,15 @@ skip_ack:
                        return 0;
 
                case ESTABLISHED:
                        return 0;
 
                case ESTABLISHED:
+                       break;
+
                case FIN_WAIT_1:
                case FIN_WAIT_2:
                case FIN_WAIT_1:
                case FIN_WAIT_2:
+                       if(c->reapable) {
+                               // We already closed the connection and are not interested in more data.
+                               goto reset;
+                       }
+
                        break;
 
                case CLOSE_WAIT:
                        break;
 
                case CLOSE_WAIT:
@@ -2006,7 +2002,7 @@ static bool reset_connection(struct utcp_connection *c) {
        hdr.src = c->src;
        hdr.dst = c->dst;
        hdr.seq = c->snd.nxt;
        hdr.src = c->src;
        hdr.dst = c->dst;
        hdr.seq = c->snd.nxt;
-       hdr.ack = 0;
+       hdr.ack = c->rcv.nxt;
        hdr.wnd = 0;
        hdr.ctl = RST;
 
        hdr.wnd = 0;
        hdr.ctl = RST;
 
@@ -2049,10 +2045,6 @@ void utcp_abort_all_connections(struct utcp *utcp) {
 }
 
 int utcp_close(struct utcp_connection *c) {
 }
 
 int utcp_close(struct utcp_connection *c) {
-       if(c->rcvbuf.used) {
-               return reset_connection(c) ? 0 : -1;
-       }
-
        if(utcp_shutdown(c, SHUT_RDWR) && errno != ENOTCONN) {
                return -1;
        }
        if(utcp_shutdown(c, SHUT_RDWR) && errno != ENOTCONN) {
                return -1;
        }
index dba4de51e11cc3dde228c20fe683f16694a3b84f..208c9553da08b9f1b6ea6c1155fc55af4995ffd2 100644 (file)
@@ -12,7 +12,7 @@
 #include "meshlink.h"
 #include "utils.h"
 
 #include "meshlink.h"
 #include "utils.h"
 
-static const size_t size = 10000000; // size of data to transfer
+static const size_t size = 12000000; // size of data to transfer
 
 struct aio_info {
        int port;
 
 struct aio_info {
        int port;
@@ -72,7 +72,6 @@ static bool accept_cb(meshlink_handle_t *mesh, meshlink_channel_t *channel, uint
        case 4:
                assert(meshlink_channel_aio_receive(mesh, channel, info->data, size / 4, aio_cb_close, &info->aio_infos[0]));
                assert(meshlink_channel_aio_receive(mesh, channel, info->data + size / 4, size - size / 4, aio_cb, &info->aio_infos[1]));
        case 4:
                assert(meshlink_channel_aio_receive(mesh, channel, info->data, size / 4, aio_cb_close, &info->aio_infos[0]));
                assert(meshlink_channel_aio_receive(mesh, channel, info->data + size / 4, size - size / 4, aio_cb, &info->aio_infos[1]));
-               set_sync_flag(&info->aio_infos[1].flag, true);
                break;
 
        default:
                break;
 
        default:
@@ -151,23 +150,32 @@ int main(void) {
                if(i < 2) {
                        assert(meshlink_channel_aio_send(mesh_a, channels[i], outdata, size / 3, aio_cb, &out_infos[i].aio_infos[0]));
                        assert(meshlink_channel_aio_send(mesh_a, channels[i], outdata + size / 3, size - size / 3, aio_cb_close, &out_infos[i].aio_infos[1]));
                if(i < 2) {
                        assert(meshlink_channel_aio_send(mesh_a, channels[i], outdata, size / 3, aio_cb, &out_infos[i].aio_infos[0]));
                        assert(meshlink_channel_aio_send(mesh_a, channels[i], outdata + size / 3, size - size / 3, aio_cb_close, &out_infos[i].aio_infos[1]));
-                       assert(wait_sync_flag(&out_infos[i].aio_infos[0].flag, 10));
-                       assert(wait_sync_flag(&out_infos[i].aio_infos[1].flag, 10));
                } else {
                        assert(meshlink_channel_aio_send(mesh_a, channels[i], outdata, size / 3, aio_cb_close, &out_infos[i].aio_infos[0]));
                        assert(meshlink_channel_aio_send(mesh_a, channels[i], outdata + size / 3, size - size / 3, aio_cb, &out_infos[i].aio_infos[1]));
                } else {
                        assert(meshlink_channel_aio_send(mesh_a, channels[i], outdata, size / 3, aio_cb_close, &out_infos[i].aio_infos[0]));
                        assert(meshlink_channel_aio_send(mesh_a, channels[i], outdata + size / 3, size - size / 3, aio_cb, &out_infos[i].aio_infos[1]));
-                       assert(wait_sync_flag(&out_infos[i].aio_infos[0].flag, 10));
-                       set_sync_flag(&out_infos[i].aio_infos[1].flag, true);
                }
        }
 
        // Wait for all AIO buffers to finish.
 
        for(size_t i = 0; i < nchannels; i++) {
                }
        }
 
        // Wait for all AIO buffers to finish.
 
        for(size_t i = 0; i < nchannels; i++) {
+               // The first chunk should always have succeeded
                assert(wait_sync_flag(&in_infos[i].aio_infos[0].flag, 10));
                assert(wait_sync_flag(&in_infos[i].aio_infos[0].flag, 10));
-               assert(wait_sync_flag(&in_infos[i].aio_infos[1].flag, 10));
                assert(wait_sync_flag(&out_infos[i].aio_infos[0].flag, 10));
                assert(wait_sync_flag(&out_infos[i].aio_infos[0].flag, 10));
-               assert(wait_sync_flag(&out_infos[i].aio_infos[1].flag, 10));
+
+               // The second chunk should only have completed if we didn't close the channel yet
+               if(i % 2) {
+                       assert(!check_sync_flag(&in_infos[i].aio_infos[1].flag));
+               } else {
+                       assert(wait_sync_flag(&in_infos[i].aio_infos[1].flag, 10));
+               }
+
+               if(i < 2) {
+                       assert(wait_sync_flag(&out_infos[i].aio_infos[1].flag, 10));
+               } else {
+                       assert(!check_sync_flag(&out_infos[i].aio_infos[1].flag));
+               }
+
        }
 
        // Check that everything is correct.
        }
 
        // Check that everything is correct.
index ac373be6f388edd9921b6e666c0a1517edd81c11..0ae3755833f365ad7381958998b82c37754c44af 100644 (file)
@@ -101,7 +101,7 @@ int main(void) {
        // Start MeshLink and wait for the channel to become connected.
        start_meshlink_pair(a, b);
 
        // Start MeshLink and wait for the channel to become connected.
        start_meshlink_pair(a, b);
 
-       assert(wait_sync_flag(&channel_opened, 5));
+       assert(wait_sync_flag(&channel_opened, 15));
 
        // Re-initialize everything
        meshlink_channel_close(a, channel);
 
        // Re-initialize everything
        meshlink_channel_close(a, channel);
@@ -123,7 +123,7 @@ int main(void) {
        assert(channel);
        meshlink_set_channel_poll_cb(a, channel, poll_cb);
 
        assert(channel);
        meshlink_set_channel_poll_cb(a, channel, poll_cb);
 
-       assert(wait_sync_flag(&channel_opened, 5));
+       assert(wait_sync_flag(&channel_opened, 15));
 
        // Send a message to b
 
 
        // Send a message to b
 
@@ -144,6 +144,17 @@ int main(void) {
        wait_sync_flag(&b_responded, 1);
        wait_sync_flag(&b_closed, 1);
 
        wait_sync_flag(&b_responded, 1);
        wait_sync_flag(&b_closed, 1);
 
+       // Try to send data on a closed channel
+
+       for(int i = 0; i < 10; i++) {
+               if(meshlink_channel_send(a, channel, "Hello", 5) == -1) {
+                       break;
+               }
+
+               assert(i != 9);
+               usleep(10000);
+       }
+
        // Try to create a second channel
 
        struct sync_flag channel_polled;
        // Try to create a second channel
 
        struct sync_flag channel_polled;