Closing a channel while there was data in the receive buffer would cause a
RST to be sent instead of a FIN. We now always send a FIN, and let data
in the receive buffer be handled for a later data handling (which would
then send a RST if necessary).
The RST could be dropped if the ACK seqno was not in the correct range.
We now always accept RSTs for established connections.
Finally, when receiving more data after closing the channel, we would just
accept the data but discard it, instead of sending a RST back. Now we do
send a RST back.
}
if(sent != (ssize_t)todo) {
}
if(sent != (ssize_t)todo) {
- /* We should never get a partial send at this point */
- assert(sent <= 0);
-
/* Sending failed, abort all outstanding AIO buffers and send a poll callback. */
if(!aio_abort(mesh, channel, &channel->aio_send)) {
return;
/* Sending failed, abort all outstanding AIO buffers and send a poll callback. */
if(!aio_abort(mesh, channel, &channel->aio_send)) {
return;
if(c->state == CLOSED) {
debug(c, "got packet for closed connection\n");
if(c->state == CLOSED) {
debug(c, "got packet for closed connection\n");
}
// It is for an existing connection.
}
// It is for an existing connection.
- if(hdr.ctl & ACK && (seqdiff(hdr.ack, c->snd.last) > 0 || seqdiff(hdr.ack, c->snd.una) < 0)) {
- debug(c, "packet ack seqno out of range, %u <= %u < %u\n", c->snd.una, hdr.ack, c->snd.una + c->sndbuf.used);
-
- // Ignore unacceptable RST packets.
- if(hdr.ctl & RST) {
- return 0;
- }
-
- goto reset;
- }
-
// 2. Handle RST packets
if(hdr.ctl & RST) {
// 2. Handle RST packets
if(hdr.ctl & RST) {
+ if(seqdiff(hdr.ack, c->snd.last) > 0 || seqdiff(hdr.ack, c->snd.una) < 0) {
+ debug(c, "packet ack seqno out of range, %u <= %u < %u\n", c->snd.una, hdr.ack, c->snd.una + c->sndbuf.used);
+ goto reset;
+ }
+
advanced = seqdiff(hdr.ack, c->snd.una);
if(advanced) {
advanced = seqdiff(hdr.ack, c->snd.una);
if(advanced) {
- if(c->reapable && !is_reliable(c)) {
- // TODO: we should also send RST for reliable connections
- goto reset;
- }
-
// RTT measurement
if(c->rtt_start.tv_sec) {
if(c->rtt_seq == hdr.ack) {
// RTT measurement
if(c->rtt_start.tv_sec) {
if(c->rtt_seq == hdr.ack) {
return 0;
case ESTABLISHED:
return 0;
case ESTABLISHED:
case FIN_WAIT_1:
case FIN_WAIT_2:
case FIN_WAIT_1:
case FIN_WAIT_2:
+ if(c->reapable) {
+ // We already closed the connection and are not interested in more data.
+ goto reset;
+ }
+
hdr.src = c->src;
hdr.dst = c->dst;
hdr.seq = c->snd.nxt;
hdr.src = c->src;
hdr.dst = c->dst;
hdr.seq = c->snd.nxt;
hdr.wnd = 0;
hdr.ctl = RST;
hdr.wnd = 0;
hdr.ctl = RST;
}
int utcp_close(struct utcp_connection *c) {
}
int utcp_close(struct utcp_connection *c) {
- if(c->rcvbuf.used) {
- return reset_connection(c) ? 0 : -1;
- }
-
if(utcp_shutdown(c, SHUT_RDWR) && errno != ENOTCONN) {
return -1;
}
if(utcp_shutdown(c, SHUT_RDWR) && errno != ENOTCONN) {
return -1;
}
#include "meshlink.h"
#include "utils.h"
#include "meshlink.h"
#include "utils.h"
-static const size_t size = 10000000; // size of data to transfer
+static const size_t size = 12000000; // size of data to transfer
struct aio_info {
int port;
struct aio_info {
int port;
case 4:
assert(meshlink_channel_aio_receive(mesh, channel, info->data, size / 4, aio_cb_close, &info->aio_infos[0]));
assert(meshlink_channel_aio_receive(mesh, channel, info->data + size / 4, size - size / 4, aio_cb, &info->aio_infos[1]));
case 4:
assert(meshlink_channel_aio_receive(mesh, channel, info->data, size / 4, aio_cb_close, &info->aio_infos[0]));
assert(meshlink_channel_aio_receive(mesh, channel, info->data + size / 4, size - size / 4, aio_cb, &info->aio_infos[1]));
- set_sync_flag(&info->aio_infos[1].flag, true);
if(i < 2) {
assert(meshlink_channel_aio_send(mesh_a, channels[i], outdata, size / 3, aio_cb, &out_infos[i].aio_infos[0]));
assert(meshlink_channel_aio_send(mesh_a, channels[i], outdata + size / 3, size - size / 3, aio_cb_close, &out_infos[i].aio_infos[1]));
if(i < 2) {
assert(meshlink_channel_aio_send(mesh_a, channels[i], outdata, size / 3, aio_cb, &out_infos[i].aio_infos[0]));
assert(meshlink_channel_aio_send(mesh_a, channels[i], outdata + size / 3, size - size / 3, aio_cb_close, &out_infos[i].aio_infos[1]));
- assert(wait_sync_flag(&out_infos[i].aio_infos[0].flag, 10));
- assert(wait_sync_flag(&out_infos[i].aio_infos[1].flag, 10));
} else {
assert(meshlink_channel_aio_send(mesh_a, channels[i], outdata, size / 3, aio_cb_close, &out_infos[i].aio_infos[0]));
assert(meshlink_channel_aio_send(mesh_a, channels[i], outdata + size / 3, size - size / 3, aio_cb, &out_infos[i].aio_infos[1]));
} else {
assert(meshlink_channel_aio_send(mesh_a, channels[i], outdata, size / 3, aio_cb_close, &out_infos[i].aio_infos[0]));
assert(meshlink_channel_aio_send(mesh_a, channels[i], outdata + size / 3, size - size / 3, aio_cb, &out_infos[i].aio_infos[1]));
- assert(wait_sync_flag(&out_infos[i].aio_infos[0].flag, 10));
- set_sync_flag(&out_infos[i].aio_infos[1].flag, true);
}
}
// Wait for all AIO buffers to finish.
for(size_t i = 0; i < nchannels; i++) {
}
}
// Wait for all AIO buffers to finish.
for(size_t i = 0; i < nchannels; i++) {
+ // The first chunk should always have succeeded
assert(wait_sync_flag(&in_infos[i].aio_infos[0].flag, 10));
assert(wait_sync_flag(&in_infos[i].aio_infos[0].flag, 10));
- assert(wait_sync_flag(&in_infos[i].aio_infos[1].flag, 10));
assert(wait_sync_flag(&out_infos[i].aio_infos[0].flag, 10));
assert(wait_sync_flag(&out_infos[i].aio_infos[0].flag, 10));
- assert(wait_sync_flag(&out_infos[i].aio_infos[1].flag, 10));
+
+ // The second chunk should only have completed if we didn't close the channel yet
+ if(i % 2) {
+ assert(!check_sync_flag(&in_infos[i].aio_infos[1].flag));
+ } else {
+ assert(wait_sync_flag(&in_infos[i].aio_infos[1].flag, 10));
+ }
+
+ if(i < 2) {
+ assert(wait_sync_flag(&out_infos[i].aio_infos[1].flag, 10));
+ } else {
+ assert(!check_sync_flag(&out_infos[i].aio_infos[1].flag));
+ }
+
}
// Check that everything is correct.
}
// Check that everything is correct.
// Start MeshLink and wait for the channel to become connected.
start_meshlink_pair(a, b);
// Start MeshLink and wait for the channel to become connected.
start_meshlink_pair(a, b);
- assert(wait_sync_flag(&channel_opened, 5));
+ assert(wait_sync_flag(&channel_opened, 15));
// Re-initialize everything
meshlink_channel_close(a, channel);
// Re-initialize everything
meshlink_channel_close(a, channel);
assert(channel);
meshlink_set_channel_poll_cb(a, channel, poll_cb);
assert(channel);
meshlink_set_channel_poll_cb(a, channel, poll_cb);
- assert(wait_sync_flag(&channel_opened, 5));
+ assert(wait_sync_flag(&channel_opened, 15));
wait_sync_flag(&b_responded, 1);
wait_sync_flag(&b_closed, 1);
wait_sync_flag(&b_responded, 1);
wait_sync_flag(&b_closed, 1);
+ // Try to send data on a closed channel
+
+ for(int i = 0; i < 10; i++) {
+ if(meshlink_channel_send(a, channel, "Hello", 5) == -1) {
+ break;
+ }
+
+ assert(i != 9);
+ usleep(10000);
+ }
+
// Try to create a second channel
struct sync_flag channel_polled;
// Try to create a second channel
struct sync_flag channel_polled;