if(!utcp->srtt) {
utcp->srtt = rtt;
utcp->rttvar = rtt / 2;
- utcp->rto = rtt + max(2 * rtt, CLOCK_GRANULARITY);
} else {
utcp->rttvar = (utcp->rttvar * 3 + absdiff(utcp->srtt, rtt)) / 4;
utcp->srtt = (utcp->srtt * 7 + rtt) / 8;
- utcp->rto = utcp->srtt + max(utcp->rttvar, CLOCK_GRANULARITY);
}
+ utcp->rto = utcp->srtt + max(4 * utcp->rttvar, CLOCK_GRANULARITY);
+
if(utcp->rto > MAX_RTO) {
utcp->rto = MAX_RTO;
}
return NULL;
}
- assert((flags & ~0xf) == 0);
+ assert((flags & ~0x1f) == 0);
c->flags = flags;
c->recv = recv;
return -1;
}
+ // Check if we need to be able to buffer all data
+
+ if(c->flags & UTCP_NO_PARTIAL) {
+ if(len > buffer_free(&c->sndbuf)) {
+ if(len > c->sndbuf.maxsize) {
+ errno = EMSGSIZE;
+ return -1;
+ } else {
+ errno = EWOULDBLOCK;
+ return 0;
+ }
+ }
+ }
+
// Add data to send buffer.
- len = buffer_put(&c->sndbuf, data, len);
+ if(is_reliable(c) || (c->state != SYN_SENT && c->state != SYN_RECEIVED)) {
+ len = buffer_put(&c->sndbuf, data, len);
+ }
if(len <= 0) {
- errno = EWOULDBLOCK;
- return 0;
+ if(is_reliable(c)) {
+ errno = EWOULDBLOCK;
+ return 0;
+ } else {
+ return len;
+ }
}
c->snd.last += len;
// Don't send anything yet if the connection has not fully established yet
- if (c->state == SYN_SENT || c->state == SYN_RECEIVED)
+ if(c->state == SYN_SENT || c->state == SYN_RECEIVED) {
return len;
+ }
ack(c, false);
start_retransmit_timer(c);
}
+ if(is_reliable(c) && !timerisset(&c->conn_timeout)) {
+ gettimeofday(&c->conn_timeout, NULL);
+ c->conn_timeout.tv_sec += c->utcp->timeout;
+ }
+
return len;
}
ptr += 2;
}
+ bool has_data = len || (hdr.ctl & (SYN | FIN));
+
// Try to match the packet to an existing connection
struct utcp_connection *c = find_connection(utcp, hdr.dst, hdr.src);
c->flags = UTCP_TCP;
}
+synack:
// Return SYN+ACK, go to SYN_RECEIVED state
c->snd.wnd = hdr.wnd;
c->rcv.irs = hdr.seq;
// It is for an existing connection.
- uint32_t prevrcvnxt = c->rcv.nxt;
-
// 1. Drop invalid packets.
// 1a. Drop packets that should not happen in our current state.
break;
}
- // 1b. Drop packets with a sequence number not in our receive window.
+ // 1b. Discard data that is not in our receive window.
- bool acceptable;
+ if(is_reliable(c)) {
+ bool acceptable;
- if(c->state == SYN_SENT) {
- acceptable = true;
- } else if(len == 0) {
- acceptable = seqdiff(hdr.seq, c->rcv.nxt) >= 0;
- } else {
- int32_t rcv_offset = seqdiff(hdr.seq, c->rcv.nxt);
+ if(c->state == SYN_SENT) {
+ acceptable = true;
+ } else if(len == 0) {
+ acceptable = seqdiff(hdr.seq, c->rcv.nxt) >= 0;
+ } else {
+ int32_t rcv_offset = seqdiff(hdr.seq, c->rcv.nxt);
- // cut already accepted front overlapping
- if(rcv_offset < 0) {
- acceptable = len > (size_t) - rcv_offset;
+ // cut already accepted front overlapping
+ if(rcv_offset < 0) {
+ acceptable = len > (size_t) - rcv_offset;
- if(acceptable) {
- ptr -= rcv_offset;
- len += rcv_offset;
- hdr.seq -= rcv_offset;
+ if(acceptable) {
+ ptr -= rcv_offset;
+ len += rcv_offset;
+ hdr.seq -= rcv_offset;
+ }
+ } else {
+ acceptable = seqdiff(hdr.seq, c->rcv.nxt) >= 0 && seqdiff(hdr.seq, c->rcv.nxt) + len <= c->rcvbuf.maxsize;
}
- } else {
- acceptable = seqdiff(hdr.seq, c->rcv.nxt) >= 0 && seqdiff(hdr.seq, c->rcv.nxt) + len <= c->rcvbuf.maxsize;
}
- }
- if(!acceptable) {
- debug("Packet not acceptable, %u <= %u + %lu < %u\n", c->rcv.nxt, hdr.seq, (unsigned long)len, c->rcv.nxt + c->rcvbuf.maxsize);
+ if(!acceptable) {
+ debug("Packet not acceptable, %u <= %u + %lu < %u\n", c->rcv.nxt, hdr.seq, (unsigned long)len, c->rcv.nxt + c->rcvbuf.maxsize);
- // Ignore unacceptable RST packets.
- if(hdr.ctl & RST) {
- return 0;
- }
+ // Ignore unacceptable RST packets.
+ if(hdr.ctl & RST) {
+ return 0;
+ }
- // Otherwise, continue processing.
- len = 0;
+ // Otherwise, continue processing.
+ len = 0;
+ }
}
c->snd.wnd = hdr.wnd; // TODO: move below
// ackno should not roll back, and it should also not be bigger than what we ever could have sent
// (= snd.una + c->sndbuf.used).
+ if(!is_reliable(c)) {
+ if(hdr.ack != c->snd.last && c->state >= ESTABLISHED) {
+ hdr.ack = c->snd.una;
+ }
+ }
+
if(hdr.ctl & ACK && (seqdiff(hdr.ack, c->snd.last) > 0 || seqdiff(hdr.ack, c->snd.una) < 0)) {
debug("Packet ack seqno out of range, %u <= %u < %u\n", c->snd.una, hdr.ack, c->snd.una + c->sndbuf.used);
c->recv(c, NULL, 0);
}
+ if(c->poll && !c->reapable) {
+ c->poll(c, 0);
+ }
+
return 0;
case SYN_RECEIVED:
c->recv(c, NULL, 0);
}
+ if(c->poll && !c->reapable) {
+ c->poll(c, 0);
+ }
+
return 0;
case CLOSING:
// 3. Advance snd.una
advanced = seqdiff(hdr.ack, c->snd.una);
- prevrcvnxt = c->rcv.nxt;
if(advanced) {
// RTT measurement
case CLOSING:
if(c->snd.una == c->snd.last) {
gettimeofday(&c->conn_timeout, NULL);
- c->conn_timeout.tv_sec += 60;
+ c->conn_timeout.tv_sec += utcp->timeout;
set_state(c, TIME_WAIT);
}
// 4. Update timers
if(advanced) {
- timerclear(&c->conn_timeout); // It will be set anew in utcp_timeout() if c->snd.una != c->snd.nxt.
-
if(c->snd.una == c->snd.last) {
stop_retransmit_timer(c);
+ timerclear(&c->conn_timeout);
} else if(is_reliable(c)) {
start_retransmit_timer(c);
+ gettimeofday(&c->conn_timeout, NULL);
+ c->conn_timeout.tv_sec += utcp->timeout;
}
}
c->rcv.irs = hdr.seq;
c->rcv.nxt = hdr.seq;
+
if(c->shut_wr) {
c->snd.last++;
set_state(c, FIN_WAIT_1);
} else {
set_state(c, ESTABLISHED);
}
+
// TODO: notify application of this somehow.
break;
case SYN_RECEIVED:
+ // This is a retransmit of a SYN, send back the SYNACK.
+ goto synack;
+
case ESTABLISHED:
case FIN_WAIT_1:
case FIN_WAIT_2:
// 7. Process FIN stuff
- if((hdr.ctl & FIN) && hdr.seq + len == c->rcv.nxt) {
+ if((hdr.ctl & FIN) && (!is_reliable(c) || hdr.seq + len == c->rcv.nxt)) {
switch(c->state) {
case SYN_SENT:
case SYN_RECEIVED:
case FIN_WAIT_2:
gettimeofday(&c->conn_timeout, NULL);
- c->conn_timeout.tv_sec += 60;
+ c->conn_timeout.tv_sec += utcp->timeout;
set_state(c, TIME_WAIT);
break;
c->rcv.nxt++;
len++;
- // Inform the application that the peer closed the connection.
+ // Inform the application that the peer closed its end of the connection.
if(c->recv) {
errno = 0;
c->recv(c, NULL, 0);
}
// Now we send something back if:
- // - we advanced rcv.nxt (ie, we got some data that needs to be ACKed)
+ // - we received data, so we have to send back an ACK
// -> sendatleastone = true
// - or we got an ack, so we should maybe send a bit more data
// -> sendatleastone = false
- ack(c, len || prevrcvnxt != c->rcv.nxt);
+ if(is_reliable(c) || hdr.ctl & SYN || hdr.ctl & FIN) {
+ ack(c, has_data);
+ }
+
return 0;
reset:
}
// Only process shutting down writes once.
- if (c->shut_wr)
+ if(c->shut_wr) {
return 0;
+ }
c->shut_wr = true;
return 0;
}
-int utcp_close(struct utcp_connection *c) {
- if(utcp_shutdown(c, SHUT_RDWR) && errno != ENOTCONN) {
- return -1;
- }
-
- c->recv = NULL;
- c->poll = NULL;
- c->reapable = true;
- return 0;
-}
-
-int utcp_abort(struct utcp_connection *c) {
+static bool reset_connection(struct utcp_connection *c) {
if(!c) {
errno = EFAULT;
- return -1;
+ return false;
}
if(c->reapable) {
debug("Error: abort() called on closed connection %p\n", c);
errno = EBADF;
- return -1;
+ return false;
}
c->recv = NULL;
c->poll = NULL;
- c->reapable = true;
switch(c->state) {
case CLOSED:
- return 0;
+ return true;
case LISTEN:
case SYN_SENT:
case LAST_ACK:
case TIME_WAIT:
set_state(c, CLOSED);
- return 0;
+ return true;
case SYN_RECEIVED:
case ESTABLISHED:
print_packet(c->utcp, "send", &hdr, sizeof(hdr));
c->utcp->send(c->utcp, &hdr, sizeof(hdr));
+ return true;
+}
+
+// Closes all the opened connections
+void utcp_abort_all_connections(struct utcp *utcp) {
+ if(!utcp) {
+ errno = EINVAL;
+ return;
+ }
+
+ for(int i = 0; i < utcp->nconnections; i++) {
+ struct utcp_connection *c = utcp->connections[i];
+
+ if(c->reapable || c->state == CLOSED) {
+ continue;
+ }
+
+ utcp_recv_t old_recv = c->recv;
+ utcp_poll_t old_poll = c->poll;
+
+ reset_connection(c);
+
+ if(old_recv) {
+ errno = 0;
+ old_recv(c, NULL, 0);
+ }
+
+ if(old_poll && !c->reapable) {
+ errno = 0;
+ old_poll(c, 0);
+ }
+ }
+
+ return;
+}
+
+int utcp_close(struct utcp_connection *c) {
+ if(utcp_shutdown(c, SHUT_RDWR) && errno != ENOTCONN) {
+ return -1;
+ }
+
+ c->recv = NULL;
+ c->poll = NULL;
+ c->reapable = true;
+ return 0;
+}
+
+int utcp_abort(struct utcp_connection *c) {
+ if(!reset_connection(c)) {
+ return -1;
+ }
+
+ c->reapable = true;
return 0;
}
c->recv(c, NULL, 0);
}
+ if(c->poll && !c->reapable) {
+ c->poll(c, 0);
+ }
+
continue;
}
for(int i = 0; i < utcp->nconnections; i++) {
struct utcp_connection *c = utcp->connections[i];
- if(!c->reapable)
+ if(!c->reapable) {
if(c->recv) {
c->recv(c, NULL, 0);
}
+ if(c->poll && !c->reapable) {
+ c->poll(c, 0);
+ }
+ }
+
buffer_exit(&c->rcvbuf);
buffer_exit(&c->sndbuf);
free(c);
then.tv_sec += utcp->timeout;
for(int i = 0; i < utcp->nconnections; i++) {
- utcp->connections[i]->rtrx_timeout = now;
- utcp->connections[i]->conn_timeout = then;
- utcp->connections[i]->rtt_start.tv_sec = 0;
+ struct utcp_connection *c = utcp->connections[i];
+
+ if(c->reapable) {
+ continue;
+ }
+
+ if(timerisset(&c->rtrx_timeout)) {
+ c->rtrx_timeout = now;
+ }
+
+ if(timerisset(&c->conn_timeout)) {
+ c->conn_timeout = then;
+ }
+
+ c->rtt_start.tv_sec = 0;
}
if(utcp->rto > START_RTO) {
}
size_t utcp_get_sndbuf_free(struct utcp_connection *c) {
- if (!c)
+ if(!c) {
return 0;
+ }
switch(c->state) {
case SYN_SENT:
}
}
+size_t utcp_get_sendq(struct utcp_connection *c) {
+ return c->sndbuf.used;
+}
+
+size_t utcp_get_recvq(struct utcp_connection *c) {
+ return c->rcvbuf.used;
+}
+
bool utcp_get_nodelay(struct utcp_connection *c) {
return c ? c->nodelay : false;
}
utcp->pre_accept = pre_accept;
}
}
+
+void utcp_expect_data(struct utcp_connection *c, bool expect) {
+ if(!c || c->reapable) {
+ return;
+ }
+
+ if(!(c->state == ESTABLISHED || c->state == FIN_WAIT_1 || c->state == FIN_WAIT_2)) {
+ return;
+ }
+
+ if(expect) {
+ // If we expect data, start the connection timer.
+ if(!timerisset(&c->conn_timeout)) {
+ gettimeofday(&c->conn_timeout, NULL);
+ c->conn_timeout.tv_sec += c->utcp->timeout;
+ }
+ } else {
+ // If we want to cancel expecting data, only clear the timer when there is no unACKed data.
+ if(c->snd.una == c->snd.last) {
+ timerclear(&c->conn_timeout);
+ }
+ }
+}
+
+void utcp_offline(struct utcp *utcp, bool offline) {
+ struct timeval now;
+ gettimeofday(&now, NULL);
+
+ for(int i = 0; i < utcp->nconnections; i++) {
+ struct utcp_connection *c = utcp->connections[i];
+
+ if(c->reapable) {
+ continue;
+ }
+
+ utcp_expect_data(c, offline);
+
+ if(!offline) {
+ if(timerisset(&c->rtrx_timeout)) {
+ c->rtrx_timeout = now;
+ }
+
+ utcp->connections[i]->rtt_start.tv_sec = 0;
+ }
+ }
+
+ if(!offline && utcp->rto > START_RTO) {
+ utcp->rto = START_RTO;
+ }
+}