您现在的位置是:首页 >学无止境 >CS144 Lab3 The TCP Sender网站首页学无止境

CS144 Lab3 The TCP Sender

L.A雨夜充盈的寒冷 2025-02-20 12:01:02
简介CS144 Lab3 The TCP Sender

Lab2、3完成了TCP Receiver,接收不可靠TCPSegment,并将其重组为可靠的bytes stream。而Lab3则开始实现TCP Sender(即下图黄色部分),将一串bytes stream拆分成一个个TCP segment,交给IP层发送出去,同时还要追踪接受者接受情况,利用重传保证可靠性。
Lab4则会完成最后的TCPConnection部分。

请添加图片描述

思路

Sender需要负责写入所有Lab2中receiver需要读取的数据:the sequence number, the SYN flag, the payload, 和 the FIN flag. 需要读取receiver写入的数据:the ackno and the window size
即下图中,写入蓝色部分,读取红色部分。

请添加图片描述
Sender 需要实现以下四点:

  1. 追踪window size (根据收到receiver的window_size 和 ackno)
  2. 持续发送segment,直到window size被填满或bytes stream为空。
  3. 追踪被发送但未被receiver acknowledge 的segment。
  4. 超时后重新发送这些segment。

这个TCP采用了超时重传机制,即发送的segment超过一定时间没被acknoledge就重传,讲义中也写七个点介绍实现逻辑,这里也写了详细的流程,所以只提一下讲义未讲清楚的点。

  1. 只需要一个timer记录最早未ack的segment,不需要多个timer记录多个segment。

  2. 一个segment包含多个byte数据(Lab2可见是一个string),而receiver又是按byte来接收并ack的(lab2我也提到了),所以只有一个segment中string的每一个bytes都被ack,才能视作segment ack。

  3. 计时器超时后执行重传。当receiver window size大于零,则重传segment,并加倍timer等待时间和增加重传次数。但是,如果接收者 window size 为 0 ,表示接收者缓存空间不够了,不需要增加连续重传计数器和加倍 timer等待时间,只需要重启计时器即可。

增加连续重传次数:过多的重传次数可能意味着网络的中断,其上层的TCPConnection可选择停止重传。
RTO(Retransmittion Timeout)的值加倍:以降低较差网络环境的重传速度,以避免加深网络环境的拥堵

所以接收者 window size 为 0时,不意味着网络的中断,也不意味着网络环境差,所以不需要增加重传计数器和加倍RTO。

  1. 发送TCPSegment分两种,一种是首次发送,另一种是重发。注意的是,所有segment包括syn和fin都只能首次发送一次,如果没收到,利用重发机制来发送,而不能再重新走首次发送。syn很容易判断是否属于首次发送,只需要判断 _next_seqno == 0 即可。**但fin无法判断是否是首次发送,所以需要一个flag来记录首次发送fin。**以防止fill_window() 不断生成新的fin segment。

  2. 根据讲义Q&A部分,window_size 初始值为1。

实现

和Lab2一样,很多讲义没提到的繁琐的细节。不过也不难,先写个大概逻辑,然后根据测试修正实现细节即可。

我的实现大体思路:一个TCPSegment有两种被发送到的方式,首次发送重发,每一个segment只可能被首次发送一次。首次发送的时候将其储存在一个我自己新建的std::queue中以便后续重发,收到ack后将其移出缓存的std::queue。同时,新建一个计时器timer类,tick会引发timer timeout,进而重发缓存中的一个segment。

头文件

class Timer {
  private:
    bool _open{false};
    unsigned long long _passby_time{0};
    unsigned int _retransmission_timeout{0};
    unsigned int _retransmission_count{0};

  public:
    void set_retrans_timeout(unsigned int timeout) { _retransmission_timeout = timeout; }

    void open() { _open = true; }

    void close() { _open = false; }

    unsigned int get_retransmission_count() const { return _retransmission_count; }

    bool check_timeout(unsigned int time_elapsed, bool incr) {
        if (!_open) {
            return false;
        }

        _passby_time += time_elapsed;
        if (_passby_time < _retransmission_timeout) {
            return false;
        }
        
        // Timeout.
        _passby_time = 0;
        if (incr) {
            _retransmission_timeout *= 2;
            _retransmission_count += 1;
        }
        return true;
    }

    void reset(unsigned int timeout) {
        _retransmission_count = 0;
        _passby_time = 0;
        _retransmission_timeout = timeout;
    }
};

class TCPSender {
  private:
    //! our initial sequence number, the number for our SYN.
    WrappingInt32 _isn;

    //! outbound queue of segments that the TCPSender wants sent
    std::queue<TCPSegment> _segments_out{};

    // _outstanding segments. Segments that sent but not commited.
    std::queue<std::pair<uint64_t, TCPSegment>> _outstanding_segments{};

    // Sent but not acknowledged bytes.
    uint64_t _bytes_in_flight{0};

    //! retransmission timer for the connection
    unsigned int _initial_retransmission_timeout;

    //! outgoing stream of bytes that have not yet been sent
    ByteStream _stream;

    //! the (absolute) sequence number for the next byte to be sent
    uint64_t _next_seqno{0};

    // Left window size
    uint16_t _window_size{1};

    // Timer for retransmission.
    Timer _timer{};

    // Sent fin flag.
    bool _sent_fin{false};

... ...
}
TCPSender::TCPSender(const size_t capacity, const uint16_t retx_timeout, const std::optional<WrappingInt32> fixed_isn)
    : _isn(fixed_isn.value_or(WrappingInt32{random_device()()}))
    , _initial_retransmission_timeout{retx_timeout}
    , _stream(capacity) {
    _timer.set_retrans_timeout(_initial_retransmission_timeout);
}

uint64_t TCPSender::bytes_in_flight() const { return _bytes_in_flight; }

void TCPSender::fill_window() {
    uint16_t left_window_size = std::max(std::uint16_t(1), _window_size);
    left_window_size = left_window_size > _bytes_in_flight ? left_window_size - _bytes_in_flight : 0;

    while (left_window_size > 0) {
        uint16_t remain_seg_length = left_window_size;
        TCPSegment tcp;
        // Set TCP header.
        tcp.header().seqno = next_seqno();
        if (_next_seqno == 0 && remain_seg_length > 0) {
            _timer.reset(_initial_retransmission_timeout);
            _timer.open();
            tcp.header().syn = 1;
            remain_seg_length -= 1;
        }
        // Set TCP Payload.
        if (remain_seg_length > 0) {
            size_t payload_limit = std::min(static_cast<std::uint16_t>(TCPConfig::MAX_PAYLOAD_SIZE), remain_seg_length);
            tcp.payload() = Buffer(_stream.peek_output(payload_limit));
            _stream.pop_output(payload_limit);
            remain_seg_length -= tcp.payload().size();
        }
        // set fin.
        if (remain_seg_length > 0 && _stream.buffer_size() == 0 && _stream.eof() && !_sent_fin) {
            tcp.header().fin = 1;
            _sent_fin = true;
            remain_seg_length -= 1;
        }
        // Update seg_length.
        size_t seg_length = tcp.length_in_sequence_space();
        left_window_size =
            std::max(static_cast<std::uint16_t>(0), static_cast<std::uint16_t>(left_window_size - seg_length));
        // Insert tcp.
        if (seg_length > 0) {
            _segments_out.push(tcp);
            _outstanding_segments.emplace(_next_seqno, tcp);
            _bytes_in_flight += seg_length;
            // Update _next_seqno
            _next_seqno += seg_length;
        } else {
            break;
        }
    }
}

//! param ackno The remote receiver's ackno (acknowledgment number)
//! param window_size The remote receiver's advertised window size
void TCPSender::ack_received(const WrappingInt32 ackno, const uint16_t window_size) {
    uint64_t abs_ackno = unwrap(ackno, _isn, _next_seqno);

    if (abs_ackno > _next_seqno) {
        // Impossible ack.
        return;
    }

    _window_size = window_size;
    bool ack = false;

    while (!_outstanding_segments.empty()) {
        auto &p = _outstanding_segments.front();
        if (p.first + p.second.length_in_sequence_space() - 1 < abs_ackno) {
            _bytes_in_flight -= _outstanding_segments.front().second.length_in_sequence_space();
            _outstanding_segments.pop();
            ack = true;
        } else {
            break;
        }
    }

    if (ack) {
        _timer.reset(_initial_retransmission_timeout);
    }
}

//! param[in] ms_since_last_tick the number of milliseconds since the last call to this method
void TCPSender::tick(const size_t ms_since_last_tick) {
    if (_timer.check_timeout(ms_since_last_tick, _window_size != 0) && !_outstanding_segments.empty()) {
        _segments_out.push(_outstanding_segments.front().second);
    }
}

unsigned int TCPSender::consecutive_retransmissions() const { return _timer.get_retransmission_count(); }

void TCPSender::send_empty_segment() {
    TCPSegment seg;
    seg.header().seqno = next_seqno();
    _segments_out.emplace(std::move(seg));
}

输入 make check_lab3 即可通过全部测试。

请添加图片描述

风语者!平时喜欢研究各种技术,目前在从事后端开发工作,热爱生活、热爱工作。