CS144 lab3

Lab3的任务是实现一个TCPsender。

1. TCPsender 的功能

    1. 将ByteStream中的数据以TCP报文的形式持续传输给接受者
    1. 根据ackno和windows size,追踪接受者的传输状态,同时检测丢包
    1. 当超出一定的时间后还没有接受到数据,那么进行重新传输

2. project

2.1 bytes_in_flight

表示已经被发出但是没有被接受的字节数
为了满足这个函数,我们需要添加一个成员outgoing_bytes表示这些没有被确认的字节数目

2.2 fill_windows

从我们的_stream中尽可能多地读取字节,当我们的窗口大小大于未被确认的字节,持续进行发送。

在发送的时候,如果没有发送SYN信号,那么此时发送SYN信号,设置segment.head().syn = true

随后设置发送段的seqno和payload, 对于payload的大小,为最大负载和当前窗口大小 - 未被确认的字节 - 是否有syn标志位的较小值。

判断结束的时候,需要满足三个条件

    1. 没有发送过fin标志
    1. 字节全部发送
    1. 剩余的空间还能放下FIN标志

当发送的数据长度为0的时候,停止数据发送,将数据段发送到_segments_out中,同时更新_outgoing中的数据。

2.3 ack_received

获得确认序号,确认此时到达的位置,如果得到的ack大于我们的next_seqno,那么说明得到的ack是错误的。

得到了正确的确认数字,那么和当前的发送列表中的数据进行比较,如果发送信号的结束序号是小于这个确认序号,那么此时该段被确认。

然后去填充窗口

2.4 tick

tick表示过去的调用经历的时间,发送方需要重新发送一些超时且没有被确认的数据包

2.5 send_empty_segment

发送一个空的段。

tcp_sender.cc

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
#include "tcp_sender.hh"

#include "tcp_config.hh"

#include <random>

// Dummy implementation of a TCP sender

// For Lab 3, please replace with a real implementation that passes the
// automated checks run by `make check_lab3`.

template <typename... Targs>
void DUMMY_CODE(Targs &&... /* unused */) {}

using namespace std;

//! \param[in] capacity the capacity of the outgoing byte stream
//! \param[in] retx_timeout the initial amount of time to wait before retransmitting the oldest outstanding segment
//! \param[in] fixed_isn the Initial Sequence Number to use, if set (otherwise uses a random ISN)
TCPSender::TCPSender(const size_t capacity, const uint16_t retx_timeout, const std::optional<WrappingInt32> fixed_isn)
: _isn(fixed_isn.value_or(WrappingInt32{random_device()()}))
, _initial_retransmission_timeout{retx_timeout}
, _stream(capacity) {}

uint64_t TCPSender::bytes_in_flight() const { return _outgoing_bytes; }

void TCPSender::fill_window() {
// 如果远程窗口大小为 0, 则把其视为 1 进行操作
size_t curr_window_size = _last_window_size ? _last_window_size : 1;
// 循环填充窗口
while (curr_window_size > _outgoing_bytes) {
// 尝试构造单个数据包
// 如果此时尚未发送 SYN 数据包,则立即发送
TCPSegment segment;
if (!_set_syn_flag) {
segment.header().syn = true;
_set_syn_flag = true;
}
// 设置 seqno
segment.header().seqno = next_seqno();

// 装入 payload.
const size_t payload_size =
min(TCPConfig::MAX_PAYLOAD_SIZE, curr_window_size - _outgoing_bytes - segment.header().syn);
string payload = _stream.read(payload_size);

/**
* 读取好后,如果满足以下条件,则增加 FIN
* 1. 从来没发送过 FIN
* 2. 输入字节流处于 EOF
* 3. window 减去 payload 大小后,仍然可以存放下 FIN
*/
if (!_set_fin_flag && _stream.eof() && payload.size() + _outgoing_bytes < curr_window_size)
_set_fin_flag = segment.header().fin = true;

segment.payload() = Buffer(move(payload));

// 如果没有任何数据,则停止数据包的发送
if (segment.length_in_sequence_space() == 0)
break;

// 如果没有正在等待的数据包,则重设更新时间
if (_outgoing_map.empty()) {
_timeout = _initial_retransmission_timeout;
_timecount = 0;
}

// 发送
_segments_out.push(segment);

// 追踪这些数据包
_outgoing_bytes += segment.length_in_sequence_space();
_outgoing_map.insert(make_pair(_next_seqno, segment));
// 更新待发送 abs seqno
_next_seqno += segment.length_in_sequence_space();

// 如果设置了 fin,则直接退出填充 window 的操作
if (segment.header().fin)
break;
}
}

//! \param ackno The remote receiver's ackno (acknowledgment number)
//! \param window_size The remote receiver's advertised window size
void TCPSender::ack_received(const WrappingInt32 ackno, const uint16_t window_size) {
size_t abs_seqno = unwrap(ackno, _isn, _next_seqno);
// 如果传入的 ack 是不可靠的,则直接丢弃
if (abs_seqno > _next_seqno)
return;
// 遍历数据结构,将已经接收到的数据包丢弃
for (auto iter = _outgoing_map.begin(); iter != _outgoing_map.end();) {
// 如果一个发送的数据包已经被成功接收
const TCPSegment &seg = iter->second;
if (iter->first + seg.length_in_sequence_space() <= abs_seqno) {
_outgoing_bytes -= seg.length_in_sequence_space();
iter = _outgoing_map.erase(iter);

// 如果有新的数据包被成功接收,则清空超时时间
_timeout = _initial_retransmission_timeout;
_timecount = 0;
}
// 如果当前遍历到的数据包还没被接收,则说明后面的数据包均未被接收,因此直接返回
else
break;
}
_consecutive_retransmissions_count = 0;
// 填充后面的数据
_last_window_size = window_size;
fill_window();
}

//! \param[in] ms_since_last_tick the number of milliseconds since the last call to this method
void TCPSender::tick(const size_t ms_since_last_tick) {
_timecount += ms_since_last_tick;

auto iter = _outgoing_map.begin();
// 如果存在发送中的数据包,并且定时器超时
if (iter != _outgoing_map.end() && _timecount >= _timeout) {
// 如果窗口大小不为0还超时,则说明网络拥堵
if (_last_window_size > 0)
_timeout *= 2;
_timecount = 0;
_segments_out.push(iter->second);
// 连续重传计时器增加
++_consecutive_retransmissions_count;
}
}

unsigned int TCPSender::consecutive_retransmissions() const { return _consecutive_retransmissions_count; }

void TCPSender::send_empty_segment() {
TCPSegment segment;
segment.header().seqno = next_seqno();
_segments_out.push(segment);
}

tcp_sender.h

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
#ifndef SPONGE_LIBSPONGE_TCP_SENDER_HH
#define SPONGE_LIBSPONGE_TCP_SENDER_HH

#include "byte_stream.hh"
#include "tcp_config.hh"
#include "tcp_segment.hh"
#include "wrapping_integers.hh"

#include <functional>
#include <queue>
#include <map>

//! \brief The "sender" part of a TCP implementation.

//! Accepts a ByteStream, divides it up into segments and sends the
//! segments, keeps track of which segments are still in-flight,
//! maintains the Retransmission Timer, and retransmits in-flight
//! segments if the retransmission timer expires.
class TCPSender {
private:
int _timeout{-1};
int _timecount{0};

std::map<size_t, TCPSegment> _outgoing_map{};
size_t _outgoing_bytes{0};

size_t _last_window_size{1};
bool _set_syn_flag{false};
bool _set_fin_flag{false};
size_t _consecutive_retransmissions_count{0};

//! our initial sequence number, the number for our SYN.
WrappingInt32 _isn;

//! outbound queue of segments that the TCPSender wants sent
std::queue<TCPSegment> _segments_out{};

//! 等待返回接收成功的段
std::queue<TCPSegment> _segments_wait{};

//! retransmission timer for the connection
unsigned int _initial_retransmission_timeout;

//! RTO 重传时间
unsigned int retransmission_time = 0;

// ! 重传次数
uint16_t _consecutive_retransmissions = 0;

//! 重传计时器
size_t retransmissions_timer = 0;
//! 重传计时器是否启动
bool retransmissions_timer_running = false;

//! outgoing stream of bytes that have not yet been sent
ByteStream _stream;

//! the (absolute) sequence number for the next byte to be sent
uint64_t _next_seqno{0};
uint64_t _recv_seqno{0};

// 标志开始和结束
bool syn = false;
bool fin = false;

// 在发送时的数据大小
uint64_t bytes_in_flight_ = 0;

// 接收窗口的大小
uint16_t windows_size = 0;

public:
//! Initialize a TCPSender
TCPSender(const size_t capacity = TCPConfig::DEFAULT_CAPACITY,
const uint16_t retx_timeout = TCPConfig::TIMEOUT_DFLT,
const std::optional<WrappingInt32> fixed_isn = {});

//! \name "Input" interface for the writer
//!@{
ByteStream &stream_in() { return _stream; }
const ByteStream &stream_in() const { return _stream; }
//!@}

//! \name Methods that can cause the TCPSender to send a segment
//!@{
void send_tcpsegment(TCPSegment &segment);

//! \brief A new acknowledgment was received
void ack_received(const WrappingInt32 ackno, const uint16_t window_size);

//! \brief Generate an empty-payload segment (useful for creating empty ACK segments)
void send_empty_segment();

//! \brief create and send segments to fill as much of the window as possible
void fill_window();

//! \brief Notifies the TCPSender of the passage of time
void tick(const size_t ms_since_last_tick);
//!@}

//! \name Accessors
//!@{

//! \brief How many sequence numbers are occupied by segments sent but not yet acknowledged?
//! \note count is in "sequence space," i.e. SYN and FIN each count for one byte
//! (see TCPSegment::length_in_sequence_space())
size_t bytes_in_flight() const;

//! \brief Number of consecutive retransmissions that have occurred in a row
unsigned int consecutive_retransmissions() const;

//! \brief TCPSegments that the TCPSender has enqueued for transmission.
//! \note These must be dequeued and sent by the TCPConnection,
//! which will need to fill in the fields that are set by the TCPReceiver
//! (ackno and window size) before sending.
std::queue<TCPSegment> &segments_out() { return _segments_out; }
//!@}

//! \name What is the next sequence number? (used for testing)
//!@{

//! \brief absolute seqno for the next byte to be sent
uint64_t next_seqno_absolute() const { return _next_seqno; }

//! \brief relative seqno for the next byte to be sent
WrappingInt32 next_seqno() const { return wrap(_next_seqno, _isn); }
//!@}

// uint64_t bytes_in_flight_ = 0;
// // 接收方的窗口大小

uint16_t receiver_window_size_ = 0;
};

#endif // SPONGE_LIBSPONGE_TCP_SENDER_HH