计算机网络实验3.2-基于UDP服务设计可靠传输协议(流量控制)

计算机网络实验3.2-基于UDP服务设计可靠传输协议(流量控制)

写在前面:这次由于突然发烧隔离(虽然不是阳,也是挺重的一次感冒)导致没能及时检查作业,比较可惜。也借此提醒自己:一定要注意身体。另外也得接受教训。如金哥所言,提高应急处突能力还是非常重要的。

实验要求

在实验3-1的基础上,将停等机制改成基于滑动窗口的流量控制机制,采用固定窗口大小,支持累积确认,完成给定测试文件的传输。

  • 多个序列号;
  • 发送缓冲区、接受缓冲区;
  • 滑动窗口:Go Back N;
  • 有必要日志输出(须显示传输过程中发送端、接收端的窗口具体情况)。

程序流程展示

注:与实验3.1相同的部分仅作简要叙述,详细可以参见计算机网络实验实验3.1

协议设计

基于rdt3.0,本次实验实现了GBN和SR两种流水线协议,并采用多线程编程。

报文结构

image-20221210213415264

如图所示,报文头长度共128Bits。下面介绍报文结构如下所示:

整个实验只使用一个序列号字段。对于发送端对应TCP中的seq,接收端对应TCP中的ack

下面是十六位校验和以及数据报字段长度,与TCP相同。

使用u_short来存放flag。其字段含义如下:

F:FIN

S:SYN

A:ACK

H:FILE_HEAD

FILE_HEAD用于指示接收端此报文包含文件信息的字段。

window_size存放接收端通告给发送端的窗口大小。

option为可选字段,在本次实验中暂时用于存放文件长度。

data的最大长度可以调节,本次实验定义为1024字节。

此部分定义代码段如下;

c++
#define MAX_SIZE 1024
#define DATA 0x0
#define FIN 0x1
#define SYN 0x2
#define ACK 0x4
#define ACK_SYN 0x6
#define ACK_FIN 0x5
#define FILE_HEAD 0x8
// datagram format:
#pragma pack(1)
struct packet_head {
    u_int seq;
    u_short check_sum;
    u_short data_size;
    u_short flag;
    u_short window_size;
    u_int option;

    packet_head() {
        seq = 0;
        check_sum = 0;
        data_size = 0;
        flag = 0;
        window_size = 0;
        option = 0;
    }
};

struct packet {
    packet_head head;
    char data[MAX_SIZE]{};

    packet() {
        packet_head();
        memset(data, 0, MAX_SIZE);
    }
};
#pragma pack()

#pragma pack(1)用于指示结构体内容按1Byte对齐,以保证报文大小是我们期望的紧凑形式。

建连和断连

image-20221119154308774

建连和断连过程与3.1无太大变化。主要是在建连过程中增加了接收方初始窗口大小的通告。

流程设计

程序支持一次建连发送多个文件。

本次实验与上次相比,整体逻辑没有变化,但序列号递增使用。在本次实验中序列号为u_int类型,可存储\(2^{32}\)个序列号,每个数据包最大为\(1024\)\(2^{10}\)字节,理论上最大可传输\(2^{42}=4TB\)的单个文件。当然,由于在本实验中因为使用option字段存储文件长度,所以实际上能传输的最大长度是\(2^{32}\)字节即即\(4GB\)。总之,在不考虑可拓展性的前提下,不需循环使用序列号即可满足目前文件传输的需求。

程序代码解释

文件发送过程

GBN

下面先以基础的GBN为基础分析程序代码:

发送端
image-20221210115848469
image-20221210115903632

课件中讲述也很直观,对照伪代码实现即可。

发送进程(主进程)
c++
//wasted space but saved time for "shifting" sndpkt window
auto *sndpkt = new packet[pkt_total + 1];
while (base < pkt_total) {
    //send packets
    if (nextseqnum < base + N && nextseqnum < pkt_total) {
        pkt_data_size = min(MAX_SIZE, file_len - nextseqnum * MAX_SIZE);
        sndpkt[nextseqnum] = make_pkt(DATA, nextseqnum, pkt_data_size, file_data + nextseqnum * MAX_SIZE);
        udt_send(sndpkt[nextseqnum]);
        cout << "Sent packet " + to_string(nextseqnum) + " ";
        if (base == nextseqnum) {
            timer.start_timer();
        }
        nextseqnum++;
        print_window();
    }
    //handle timeout
    if (timer.timeout()) {
        print_message("Timeout, resend packets from " + to_string(base) + " to " + to_string(nextseqnum - 1),
                      WARNING);
        for (u_int i = base; i < nextseqnum; i++) {
            udt_send(sndpkt[i]);
        }
        timer.start_timer();
    }
}

这一部分逻辑与状态机中右上角两个部分完全一致。其中timer是一个全局的计时器,为自己编写的类。用法可以顾名思义。print_window();按照[base|nexeseqnum|base+n]的格式将当前窗口状态打印出来。

接收进程

接收进程在发送之前创建。实现如下:

c++
DWORD WINAPI handle_ACK(LPVOID lpParam) {
    packet rcvpkt;
    while (true) {
        //non-blocking rdt_rcv (if not rcv any packet it, it will return 0)
        while (!rdt_rcv(rcvpkt) || corrupt(rcvpkt) || !isACK(rcvpkt)) {
            //the packet must be ACK and not corrupt to jump out of the loop
        }
//        base = get_ack_num(rcvpkt) + 1;
        acked[get_ack_num(rcvpkt)] = true;
        while (acked[base]) {
            base++;
        }
        cout << "Received ACK " + to_string(get_ack_num(rcvpkt)) + " ";
        print_window();
        if (base == pkt_total) {
            return 0;
        }
        if (base == nextseqnum) {
            timer.stop_timer();
            continue;
        } else {
            timer.start_timer();
        }
    }
}

这一部分助教思考的也非常深入:虽然实验环境下ACK不会丢失且能按序到达,但真实的网络环境下ACK也会丢失,且由于传输速度可能不一样快,ACK未必是按序到达的。这一点在上一次实验的握手建连部分考虑到了且有所叙述(接收方在确认握手成功后也有可能因为ACK丢失而收到发送方重发的握手包),但这一次实验囿于伪代码的惯性思维没有考虑周全,感谢助教提醒指正。

因此此处可参考后续SR的实现,移动时不能简单的移动到当前收到的序号+1,而是从base开始移动到按序收到的包序号之后,即:

c++
//        base = get_ack_num(rcvpkt) + 1;
        acked[get_ack_num(rcvpkt)] = true;
        while (acked[base]) {
            base++;
        }

当然这也需要把收到包的状态保存为数组。此处的空间复杂度还可以优化,也即实际上我们只需要窗口内的ACK状态即可,但这样就伴随着较为费时的“移动”。因此此处直接将所有状态都保存了下来。

接收端

接收端的变化主要一个是将窗口通告给发送方。这个任务只需要修改make_pkt即可,不再赘述。

另外一个是接收发送方数据的逻辑。

image-20221210123705619

代码如下所示:

C++
while (rdt_rcv(rcvpkt)) {
    if (not_corrupt(rcvpkt)) {
        if (hasseqnum(rcvpkt, expectedseqnum)) {
            pkt_data_size = rcvpkt.head.data_size;
            memcpy(file_buffer + received_file_len, rcvpkt.data, pkt_data_size);
            received_file_len += pkt_data_size;
            packet sndpkt = make_pkt(ACK,expectedseqnum);
            udt_send(sndpkt);
            print_message("Received packet " + to_string(expectedseqnum), DEBUG);
            expectedseqnum++;
        } else {
            //discard the packet and wait for the next one
            print_message("Received a out-of-order packet", WARNING);
            continue;
        }
    } else {
        print_message("Received a corrupt packet", DEBUG);
        continue;
    }
    if (received_file_len == file_size) {
        ...file received, writing file to the disk...
    }
}

可以从状态机看到,我们只需要关注expectedseqnum对应的包即可,其他的包收到直接丢弃即可。这一部分并不复杂,对照状态机容易理解。

另外,在状态机中没有”失序重发ACK,确认按序正确接收的最高序号分组“对应的部分。事实上本次实验不重发ACK对正确性没有影响,但其对于拥塞控制来说是必须的。

SR

在这次实验中也额外实现了SR选择重传流水线协议。

image-20221210200312667
image-20221210201321172
发送端

发送端相比GBN,需要为每一个分组都加入定时器。一开始是希望每一个包发出后都创建一个相同进程用于监听ACK并处理超时事件,但实践中发现由于,传入的packet地址会在发送过程中被修改,在线程中无法正确每个线程获取需要等待的分组的序号和内容。因此转而采用线程池的方式进行实现,每个线程维护自己的定时器和需要重发的分组。代码如下所示:

C++
//wasted space but saved time for "shifting" sndpkt window
clock_t single_file_timer = clock();
while (base < pkt_total) {
    //send packets
    if (nextseqnum < base + N && nextseqnum < pkt_total) {
        pkt_data_size = min(MAX_SIZE, file_len - nextseqnum * MAX_SIZE);
        sndpkt_buffer[nextseqnum] = make_pkt(DATA, nextseqnum, pkt_data_size,
                                             file_data + nextseqnum * MAX_SIZE);
        udt_send(sndpkt_buffer[nextseqnum]);
        cout << "Sent packet " + to_string(nextseqnum) + " ";
        print_window();
        thread_pool[nextseqnum] = CreateThread(nullptr, 0, SR, &sndpkt_buffer[nextseqnum], 0, nullptr);
        nextseqnum++;
    }
}

其中SR线程的实现思路如下:

acked数组中保存了当前分组确认的状态,和GBN中所叙述的类似。这个数组的内容由接收线程(与GBN相同,但只标记acked数组状态,不进行窗口滑动)进行修改。当第wait_seq个包对应的SR线程发现监听到了对应acked数组变化(即接收ACK(n))判断是否进行窗口滑动。若超时,则重发对应的包,重启定时器。

下面是对应的代码:

C++
DWORD WINAPI SR(LPVOID lpParam) {
    packet sndpkt = *reinterpret_cast<packet *>(lpParam);
    u_int wait_seq = sndpkt.head.seq;
    int resend_times = 0;
    //start a timer
    clock_t start = clock();
    while (!acked[wait_seq]) {
        if (timeout(start)) {
            udt_send(sndpkt);
            start = clock();
            if (resend_times > MAX_RESEND_TIMES) {
                print_message("Resend times exceed the limit, there must be something wrong with the network", ERR);
                return 1;
            } else {
                cout << "Resend packet " + to_string(sndpkt.head.seq) + " ";
                print_window();
                resend_times++;
            }
        }
    }
    //if reach here, the packet is ACKed
    if (wait_seq == base) {
        //if the ACKed packet is the base, move the window to the first unACKed packet
        while (acked[base]) {
            base++;
        }
    }
    return 0;
}
接收端

接收端比GBN的情况要复杂一些,因为需要缓存失序的包。实现如下:

C++
//"blocking receive" here
while (rdt_rcv(rcvpkt)) {
    if (not_corrupt(rcvpkt)) {
        u_int pkt_seq = rcvpkt.head.seq;
        if (pkt_seq >= rcv_base && pkt_seq < rcv_base + N) {
            //in the window
            if (!acked[pkt_seq]) {
                if (pkt_seq == rcv_base) {
                    //the first packet in the window
                    pkt_data_size = rcvpkt.head.data_size;
                    memcpy(file_buffer + pkt_seq * MAX_SIZE, rcvpkt.data, pkt_data_size);
                    acked[pkt_seq] = true;
                    packet sndpkt = make_pkt(ACK, rcv_base);
                    udt_send(sndpkt);
                    print_message("Received packet " + to_string(pkt_seq), DEBUG);
                    //slide the window
                    while (acked[rcv_base]) {
                        rcv_base++;
                    }
                } else {
                    //not the first packet in the window, cache it
                    pkt_data_size = rcvpkt.head.data_size;
                    memcpy(file_buffer + pkt_seq * MAX_SIZE, rcvpkt.data, pkt_data_size);
                    acked[pkt_seq] = true;
                    packet sndpkt = make_pkt(ACK, pkt_seq);
                    udt_send(sndpkt);
                    print_message("Received packet " + to_string(pkt_seq)+", cached", DEBUG);
                }
            } else {
                //already acked in the window, resend the ack
                print_message("Received packet " + to_string(pkt_seq) + " again", WARNING);
                //send ack
                packet sndpkt = make_pkt(ACK, pkt_seq);
                udt_send(sndpkt);
                print_message("Sent ack " + to_string(pkt_seq), DEBUG);
            }
        } else if ((pkt_seq >= rcv_base - N) && (pkt_seq < rcv_base)) {
            //out of the window, but in the buffer
            print_message("Received packet " + to_string(pkt_seq) + " again", WARNING);
            //send ack
            packet sndpkt = make_pkt(ACK, pkt_seq);
            udt_send(sndpkt);
            print_message("Sent ack " + to_string(pkt_seq), DEBUG);
        } else {
            //out of the window and buffer
            print_message("Received packet " + to_string(pkt_seq) + " out of the window", WARNING);
            //do nothing
        }
    } else {
        print_message("Received a corrupt packet", DEBUG);
        continue;
    }
    if (rcv_base * MAX_SIZE >= file_size) {
        ...file received, writing file to the disk...
    }
}

虽然代码较长,但无非是区分了接收的分组在窗口内,窗口之前或者超出了缓冲区三种情况。

程序演示

建立连接

路由器设置:

image-20221119185527966

流程展示在上一次实验已经展示的比较完善。本次实验主要演示有丢包延时条件下的发送情况。

GBN流水线协议

可以看到发送方当发现超时候会将basenextseqnum之间的包全部重发。

image-20221210212155885

接收方也能判断失序的分组:

image-20221210212321733

SR流水线协议

在SN协议的实践中,发现发送方会经常由于超时重发接收方能够收到的包。猜测是由于接收方串行接收且接收的逻辑比发送方发送的逻辑复杂,导致了接收的速度跟不上发送的速度。

image-20221210211645995
image-20221210211627874

因此尝试略微调大超时时间(3*MAX_TIME)可以看到正常的接收发送过程。

image-20221210211424408
image-20221210211449133

发送成功!

image-20221210212403935

GitHub:

https://github.com/Lunaticsky-tql/rdt_on_udp

GBN对应lab3.2分支,SR对应lab3.2-SR分支。


计算机网络实验3.2-基于UDP服务设计可靠传输协议(流量控制)
http://lunaticsky-tql.github.io/posts/39007/
作者
Lunatic sky
发布于
2022年12月24日
许可协议