您现在的位置是:首页 >学无止境 >CS144 Lab3 The TCP Sender网站首页学无止境
CS144 Lab3 The TCP Sender
Lab2、3完成了TCP Receiver,接收不可靠TCPSegment,并将其重组为可靠的bytes stream。而Lab3则开始实现TCP Sender(即下图黄色部分),将一串bytes stream拆分成一个个TCP segment,交给IP层发送出去,同时还要追踪接受者接受情况,利用重传保证可靠性。
Lab4则会完成最后的TCPConnection部分。
思路
Sender需要负责写入所有Lab2中receiver需要读取的数据:the sequence number, the SYN flag, the payload, 和 the FIN flag. 需要读取receiver写入的数据:the ackno and the window size
即下图中,写入蓝色部分,读取红色部分。
Sender 需要实现以下四点:
- 追踪window size (根据收到receiver的window_size 和 ackno)
- 持续发送segment,直到window size被填满或bytes stream为空。
- 追踪被发送但未被receiver acknowledge 的segment。
- 超时后重新发送这些segment。
这个TCP采用了超时重传机制,即发送的segment超过一定时间没被acknoledge就重传,讲义中也写七个点介绍实现逻辑,这里也写了详细的流程,所以只提一下讲义未讲清楚的点。
-
只需要一个timer记录最早未ack的segment,不需要多个timer记录多个segment。
-
一个segment包含多个byte数据(Lab2可见是一个string),而receiver又是按byte来接收并ack的(lab2我也提到了),所以只有一个segment中string的每一个bytes都被ack,才能视作segment ack。
-
计时器超时后执行重传。当receiver window size大于零,则重传segment,并加倍timer等待时间和增加重传次数。但是,如果接收者 window size 为 0 ,表示接收者缓存空间不够了,不需要增加连续重传计数器和加倍 timer等待时间,只需要重启计时器即可。
增加连续重传次数:过多的重传次数可能意味着网络的中断,其上层的TCPConnection可选择停止重传。
RTO(Retransmittion Timeout)的值加倍:以降低较差网络环境的重传速度,以避免加深网络环境的拥堵
所以接收者 window size 为 0时,不意味着网络的中断,也不意味着网络环境差,所以不需要增加重传计数器和加倍RTO。
-
发送TCPSegment分两种,一种是首次发送,另一种是重发。注意的是,所有segment包括syn和fin都只能首次发送一次,如果没收到,利用重发机制来发送,而不能再重新走首次发送。syn很容易判断是否属于首次发送,只需要判断 _next_seqno == 0 即可。**但fin无法判断是否是首次发送,所以需要一个flag来记录首次发送fin。**以防止fill_window() 不断生成新的fin segment。
-
根据讲义Q&A部分,window_size 初始值为1。
实现
和Lab2一样,很多讲义没提到的繁琐的细节。不过也不难,先写个大概逻辑,然后根据测试修正实现细节即可。
我的实现大体思路:一个TCPSegment有两种被发送到的方式,首次发送和重发,每一个segment只可能被首次发送一次。首次发送的时候将其储存在一个我自己新建的std::queue中以便后续重发,收到ack后将其移出缓存的std::queue。同时,新建一个计时器timer类,tick会引发timer timeout,进而重发缓存中的一个segment。
头文件
class Timer {
private:
bool _open{false};
unsigned long long _passby_time{0};
unsigned int _retransmission_timeout{0};
unsigned int _retransmission_count{0};
public:
void set_retrans_timeout(unsigned int timeout) { _retransmission_timeout = timeout; }
void open() { _open = true; }
void close() { _open = false; }
unsigned int get_retransmission_count() const { return _retransmission_count; }
bool check_timeout(unsigned int time_elapsed, bool incr) {
if (!_open) {
return false;
}
_passby_time += time_elapsed;
if (_passby_time < _retransmission_timeout) {
return false;
}
// Timeout.
_passby_time = 0;
if (incr) {
_retransmission_timeout *= 2;
_retransmission_count += 1;
}
return true;
}
void reset(unsigned int timeout) {
_retransmission_count = 0;
_passby_time = 0;
_retransmission_timeout = timeout;
}
};
class TCPSender {
private:
//! our initial sequence number, the number for our SYN.
WrappingInt32 _isn;
//! outbound queue of segments that the TCPSender wants sent
std::queue<TCPSegment> _segments_out{};
// _outstanding segments. Segments that sent but not commited.
std::queue<std::pair<uint64_t, TCPSegment>> _outstanding_segments{};
// Sent but not acknowledged bytes.
uint64_t _bytes_in_flight{0};
//! retransmission timer for the connection
unsigned int _initial_retransmission_timeout;
//! outgoing stream of bytes that have not yet been sent
ByteStream _stream;
//! the (absolute) sequence number for the next byte to be sent
uint64_t _next_seqno{0};
// Left window size
uint16_t _window_size{1};
// Timer for retransmission.
Timer _timer{};
// Sent fin flag.
bool _sent_fin{false};
... ...
}
TCPSender::TCPSender(const size_t capacity, const uint16_t retx_timeout, const std::optional<WrappingInt32> fixed_isn)
: _isn(fixed_isn.value_or(WrappingInt32{random_device()()}))
, _initial_retransmission_timeout{retx_timeout}
, _stream(capacity) {
_timer.set_retrans_timeout(_initial_retransmission_timeout);
}
uint64_t TCPSender::bytes_in_flight() const { return _bytes_in_flight; }
void TCPSender::fill_window() {
uint16_t left_window_size = std::max(std::uint16_t(1), _window_size);
left_window_size = left_window_size > _bytes_in_flight ? left_window_size - _bytes_in_flight : 0;
while (left_window_size > 0) {
uint16_t remain_seg_length = left_window_size;
TCPSegment tcp;
// Set TCP header.
tcp.header().seqno = next_seqno();
if (_next_seqno == 0 && remain_seg_length > 0) {
_timer.reset(_initial_retransmission_timeout);
_timer.open();
tcp.header().syn = 1;
remain_seg_length -= 1;
}
// Set TCP Payload.
if (remain_seg_length > 0) {
size_t payload_limit = std::min(static_cast<std::uint16_t>(TCPConfig::MAX_PAYLOAD_SIZE), remain_seg_length);
tcp.payload() = Buffer(_stream.peek_output(payload_limit));
_stream.pop_output(payload_limit);
remain_seg_length -= tcp.payload().size();
}
// set fin.
if (remain_seg_length > 0 && _stream.buffer_size() == 0 && _stream.eof() && !_sent_fin) {
tcp.header().fin = 1;
_sent_fin = true;
remain_seg_length -= 1;
}
// Update seg_length.
size_t seg_length = tcp.length_in_sequence_space();
left_window_size =
std::max(static_cast<std::uint16_t>(0), static_cast<std::uint16_t>(left_window_size - seg_length));
// Insert tcp.
if (seg_length > 0) {
_segments_out.push(tcp);
_outstanding_segments.emplace(_next_seqno, tcp);
_bytes_in_flight += seg_length;
// Update _next_seqno
_next_seqno += seg_length;
} else {
break;
}
}
}
//! param ackno The remote receiver's ackno (acknowledgment number)
//! param window_size The remote receiver's advertised window size
void TCPSender::ack_received(const WrappingInt32 ackno, const uint16_t window_size) {
uint64_t abs_ackno = unwrap(ackno, _isn, _next_seqno);
if (abs_ackno > _next_seqno) {
// Impossible ack.
return;
}
_window_size = window_size;
bool ack = false;
while (!_outstanding_segments.empty()) {
auto &p = _outstanding_segments.front();
if (p.first + p.second.length_in_sequence_space() - 1 < abs_ackno) {
_bytes_in_flight -= _outstanding_segments.front().second.length_in_sequence_space();
_outstanding_segments.pop();
ack = true;
} else {
break;
}
}
if (ack) {
_timer.reset(_initial_retransmission_timeout);
}
}
//! param[in] ms_since_last_tick the number of milliseconds since the last call to this method
void TCPSender::tick(const size_t ms_since_last_tick) {
if (_timer.check_timeout(ms_since_last_tick, _window_size != 0) && !_outstanding_segments.empty()) {
_segments_out.push(_outstanding_segments.front().second);
}
}
unsigned int TCPSender::consecutive_retransmissions() const { return _timer.get_retransmission_count(); }
void TCPSender::send_empty_segment() {
TCPSegment seg;
seg.header().seqno = next_seqno();
_segments_out.emplace(std::move(seg));
}
输入 make check_lab3 即可通过全部测试。