Lab 1: Stitching Substrings into a Byte Stream

重组器

经过了 check0 的洗礼,大概知道课程的 Coding 难度了。Docs 特别强调,无需修改除了特别说明的文件以外的文件。也就是 CMake -tidy 提示大可以不用修改。

1. Docs

下面的 Lab 我们会逐步完善我们的 TCP 构造。

1.1. 我是谁?我在哪?我为什么要写 TCP?

⋆Why am I doing this?

Providing a service or an abstraction on top of a different less-reliable service accounts for many of the interesting problems in networking. Over the last 40 years, researchers and practitioners have figured out how to convey all kinds of things—messaging and e-mail, hyperlinked documents, search engines, sound and video, virtual worlds, collaborative file sharing, digital currencies—over the Internet. TCP’s own role, providing a pair of reliable byte streams using unreliable datagrams, is one of the classic examples of this. A reasonable view has it that TCP implementations count as the most widely used nontrivial computer programs on the planet.

1.2. Hands-on

今年 CS144 新增了一些动手实现的组件...希望文档交代的清楚一点...不然网上连代码都找不到...

1.3. Getting started

先 fetch 一下 check 1 的其实代码哈。这里我直接把官方的代码 fork 下来了(防止官方关闭...)

但是我其实不知道到时候怎么 merge 我 fork 的 rep...到时候再说吧...见一步行一步...

2. Hands-on component: a private network for the class

班级的私有网络。CS144 是不是不希望外校的学生写 Lab啊...总是需要内校合作...

总的来说是测试和同学之间的 ping 连接...

  1. 使用 sudo apt install wireguard 安装程序

  2. https://cs144.net/wg 跟着操作...但是根本登录不上...没有 Stanford 账号

  3. 跟着 Page ping 一下

2.1. Ping a friend and look at the datagrams

  1. 使用 sudo apt install wireshark 安装程序

  2. 找同学要 IP address...

  3. 具体操作

    1. You can end the “ping” program by typing ctrl-C.

    2. You can make the “ping” command go faster by including the argument -i 0.2 . This will make it send an “echo request” every 0.2 seconds (5 times per second).

    3. You can make the “ping” command print out a summary of the statistics so far (without ending it) by running this command in another terminal: killall-QUIT ping

  4. 然后写报告...这里跳过了,具体看 Lab,问题还算比较清晰的...

2.2. Send an Internet datagram by hand

发送一个数据报.

还是需要同学...

在 apps/ip_raw.cc 文件中,编写一个程序,使用原始套接字向你的朋友发送一个互联网数据报,方法与 1 月 10 日讲座中相同。可以参考讲座中的代码进行修改。

  1. 向你的小组成员发送一个 IP 协议为“5”的互联网数据报(你需要使用 sudo 来运行 ./build/apps/ip_raw 程序),并让你的朋友使用 tcpdump 确保他们收到了数据报。他们可以运行 sudo tcpdump -n -i wg0 'proto 5' 来仅打印协议为“5”的数据报。确保他们收到它!

  2. 向你的小组成员发送一个用户数据报(IP 协议为“17”),使用 https://www.rfc-editor.org/rfc/rfc768 中的“用户数据报”头部格式。你的朋友可以在不使用 sudo 的情况下接收这个数据报。他们可以使用讲座中提到的 nc -u 程序,或者使用 C++ 程序和 UDPSocket 类——他们可以自行选择!

  3. 将你的 ip_raw.cc 程序代码包含在提交到这个检查点的内容中。

  4. 同样,反过来接收你朋友发送给你的数据报。

这里仍然跳过这部分实现了,不太好参照标准,后续看看有无大佬有现成代码,参考参考。

3. Implementation: putting substrings in sequence

这部分终于可以和前面对接上来了,写一个 TCP 接收器。

该模块接收数据报,并将其转换为可靠的字节流,以便应用程序从套接字读取——就像你在 Checkpoint 0 中的 webget 程序从 web 服务器读取字节流一样。

TCP 发送方将字节流分成较短的段(每个子字符串最多约 1,460 字节),以便它们都能装入一个数据报中。但网络可能会对这些数据报进行重新排序,或者丢弃它们,或者多次传递它们。接收器必须将这些段重新组装成它们最初的样子,即连续的字节流。

在本实验中,你将编写负责这种重新组装的数据结构:一个重组器(Reassembler)。它将接收子字符串,包括一个字节字符串,以及该字符串在更大流中的第一个字节的索引。流中的每个字节都有其唯一的索引,从零开始并向上计数。一旦重组器知道流中的下一个字节,它就会将其写入字节流的写入端(Writer)——这与你在 Checkpoint 0 中实现的字节流相同。重组器的“客户”可以从同一个字节流的读取端(Reader)读取。

它的接口如下:

// 将一个新的子字符串插入到字节流中进行重组。
void insert(uint64_t first_index, std::string data, bool is_last_substring);

// 重组器本身存储了多少字节?
// 该函数仅用于测试;不要添加额外的状态来支持它。
uint64_t count_bytes_pending() const;

// 访问输出流的读取端。
Reader& reader();

为什么要做这个?TCP 对乱序和重复的鲁棒性来自于它能够将任意片段重新拼接回原始字节流。将其实现为一个独立的、可测试的模块,将使处理传入段变得更加容易。

完整的(公开的)重组器接口由reassembler.hh头文件中的Reassembler类描述。你的任务是实现这个类。你可以随意向Reassembler类中添加任何私有成员和成员函数,但不能更改其公开接口。

3.1. What should the Reassembler store internally?

insert方法会通知重组器关于字节流的一个新片段,以及它在整体流中的位置(子字符串的起始索引)。

原则上,重组器需要处理以下三类知识:

  1. 流中的下一个字节。一旦重组器知道了这些字节,就应该立即将它们推送到流中(输出的.writer())。

  2. 符合流可用容量但还不能被写入的字节,因为更早的字节仍然未知。这些字节应该在重组器内部存储。

  3. 超出流可用容量的字节。这些字节应该被丢弃。重组器不会存储任何无法被推送到字节流的字节,无论是立即推送,还是在更早的字节被知晓后推送。

这种行为的目标是限制重组器和字节流使用的内存量,无论传入的子字符串如何到达。我们在下图中展示了这一点。容量是一个上限,限制了:

  1. 重组后的字节流中缓冲的字节数(图中绿色部分)。

  2. “未组装”子字符串可以使用的字节数(图中红色部分)。

3.2. FAQs

  • 整个流的第一个字节的索引是多少?

    • 零。

  • 我的实现应该有多高效?

    • 数据结构的选择在这里也很重要。

    • 请不要将这视为构建极其低效(空间或时间)数据结构的挑战——重组器将是你的 TCP 实现的基础。

    • 我们为你提供了一个基准测试;

    • 任何大于 0.1 Gbit/s(100 Mbps)的性能都是可以接受的。顶级的重组器可以达到 10 Gbit/s。

  • 如何处理不一致的子字符串?

    • 你可以假设它们不存在。

    • 也就是说,你可以假设存在一个唯一的底层字节流,所有子字符串都是它的(准确的)片段。

  • 我可以使用什么?

    • 你可以使用标准库中的任何部分。特别是,我们期望你至少使用一种数据结构。

  • 字节何时应该写入流中?

    • 尽可能早。

    • 唯一一种字节不应该在流中的情况是:

      • 当它之前还有字节尚未被“推送”时。

  • 提供给insert()函数的子字符串可能会重叠吗?

    • 是的。

  • 我是否需要向重组器中添加私有成员?

    • 是的。

    • 子字符串可能会以任何顺序到达,因此你的数据结构需要“记住”子字符串,直到它们准备好被放入流中——也就是说,直到它们之前的所有索引都被写入。

  • 重组器的数据结构是否可以存储重叠的子字符串?

    • 不可以。

    • 虽然可以实现一个“接口正确”的重组器来存储重叠的子字符串,但允许重组器这样做会破坏“容量”作为内存限制的概念。

    • 如果调用者提供了关于相同索引的冗余知识,重组器应该只存储这些信息的一个副本。

  • 重组器会使用字节流的读取端(Reader)吗?

    • 不会——那是留给外部客户的。

    • 重组器只使用写入端。

  • 我们期望的代码量是多少?

    • 当我们运行./scripts/lines-of-code对起始代码进行统计时,它显示:

      • ByteStream: 82 行代码

      • Reassembler: 26 行代码

    • 当我们对我们的解决方案运行时,它显示:

      • ByteStream: 111 行代码

      • Reassembler: 85 行代码

    • 因此,一个合理的重组器实现可能在起始代码的基础上增加 50-60 行代码。

3.3. Code Implement

文档的描述还算清晰,这次的任务代码量around 100 lines。需要实现一种能实现接口且带宽尽可能大的数据结构。这里我还是认为自己不大能独立完成这个任务...策略是这样,先结合 kimi 和自己的理解进行 first coding...然后结合 Rose 大佬的源码 second coding,最后总结这次代码的一些逻辑完成 third coding...

这个debug.hh到达是什么东西...

路径根本不对,不知道start code里为什么要放上这个...红色波浪线太掉兴致了。无语,现在先删了。

Kimi 还是不大好用哇

AI 目前还是比较吃操作的...一些傻瓜式操作已经不能完成这种带有一点复杂的代码了,特别一份代码有多个文件。现在转过去理解 rose 大佬的思路,目前把 .hh 写完了(抄完了),大概是知道了对数据的操作,代码总量还是有点吓人了,继续看。

原来真的是层层递进的

这次的 lab 会用到上次用到的 bytestream 类...用到一些成员方法...

大佬是真的强

用到了好多C++新标准的内容,看来是掌握得很好吧...诸如:

  • if 结合 unlikely

  • move 移动语义

  • auto 还可以像 matlab 一样 [a, b] 返回两个参数呢?

  • exchange 又是什么玩意

  • std::erase_if 甚至是 C++20 的标准...新东西

  • 果然 CSDIY 评价 CS144 为一门进阶 C++ 的课程...

  • contains...

  • 新标准好多都是报错的...不知道是不是我的 compiler version too low...

  • 在虚拟机里可以运行,而且测试用例还通过了,虽然只通过了一个

第一版代码,花了一晚上...

...完全是盲人摸象...但好过刷短视频吧,看着200来行代码还是很有成就感的,虽然全是抄的,但我理解了,就是我的了...

reassembler.hh

#pragma once

#include "byte_stream.hh"
#include <map>
#include <optional>

class Reassembler
{
public:
  // Construct Reassembler to write into given ByteStream.
  explicit Reassembler(ByteStream &&output) : output_(std::move(output)) {}

  /*
   * Insert a new substring to be reassembled into a ByteStream.
   *   `first_index`: the index of the first byte of the substring
   *   `data`: the substring itself
   *   `is_last_substring`: this substring represents the end of the stream
   *   `output`: a mutable reference to the Writer
   *
   * The Reassembler's job is to reassemble the indexed substrings (possibly out-of-order
   * and possibly overlapping) back into the original ByteStream. As soon as the Reassembler
   * learns the next byte in the stream, it should write it to the output.
   *
   * If the Reassembler learns about bytes that fit within the stream's available capacity
   * but can't yet be written (because earlier bytes remain unknown), it should store them
   * internally until the gaps are filled in.
   *
   * The Reassembler should discard any bytes that lie beyond the stream's available capacity
   * (i.e., bytes that couldn't be written even if earlier gaps get filled in).
   *
   * The Reassembler should close the stream after writing the last byte.
   */
  void insert(uint64_t first_index, std::string data, bool is_last_substring);

  // How many bytes are stored in the Reassembler itself?
  // This function is for testing only; don't add extra state to support it.
  uint64_t count_bytes_pending() const;

  // Access output stream reader
  Reader &reader() { return output_.reader(); }
  const Reader &reader() const { return output_.reader(); }

  // Access output stream writer, but const-only (can't write from outside)
  const Writer &writer() const { return output_.writer(); }

private:
  void insert_or_store(uint64_t first_index, std::string data);
  void write_stored_str();
  void write(std::string data);
  void store(uint64_t first_index, std::string data);
  uint64_t truncate_head(uint64_t old_index, std::string &data);

private:
  ByteStream output_; // reassembler writes to this bytestream
  std::map<uint64_t, std::string> pending_substr_{};
  uint64_t bytes_pending_{};
  uint64_t next_index_{};
  std::optional<uint64_t> total_pushed_len_{std::nullopt};
};

reassembler.cc

#include "reassembler.hh"
#include <iostream>
#include <utility>
#include <vector>

using namespace std;

void Reassembler::insert(uint64_t first_index, string data, bool is_last_substring)
{
  if (is_last_substring) //[[_UNLIKELY]]
  {
    total_pushed_len_ = first_index + data.length();
  }
  insert_or_store(first_index, std::move(data));
  write_stored_str();
  if (total_pushed_len_.has_value() && output_.writer().bytes_pushed() == *total_pushed_len_)
  {
    output_.writer().close();
  }
}

void Reassembler::insert_or_store(uint64_t first_index, std::string data)
{
  if (first_index < next_index_)
  {
    // update first_index...
    first_index = truncate_head(first_index, data);
  }
  // why?
  if (first_index > next_index_)
  {
    store(first_index, std::move(data));
  }
  else
  {
    write(std::move(data));
  }
}
void Reassembler::write_stored_str()
{
  for (auto &[first_index, data] : pending_substr_)
  {
    if (first_index <= next_index_)
    {
      auto buf = std::exchange(data, "");
      bytes_pending_ -= buf.length();
      insert_or_store(first_index, std::move(buf));
    }
  }
  // lambda expression depends what to erase...
  std::erase_if(pending_substr_, [](const auto &elem)
                { return elem.second.empty(); });
}

void Reassembler::write(std::string data)
{
  output_.writer().push(std::move(data));
  next_index_ = output_.writer().bytes_pushed();
}

void Reassembler::store(uint64_t first_index, std::string data)
{
  if (auto len = output_.writer().available_capacity() - (first_index - next_index_); data.length() >= len)
  {
    data.erase(data.begin() + len, data.end());
  }
  if (data.empty()) //[[__UNLIKELY]]
  {
    return;
  }
  if (pending_substr_.empty()) //[[__UNLIKELY]]
  {
    bytes_pending_ += data.length();
    pending_substr_.emplace(first_index, std::move(data));
  }
  else
  {
    auto final_index = first_index + data.length() - 1;

    if (pending_substr_.contains(first_index))
    {
      if (pending_substr_[first_index].length() >= data.length())
      {
        return;
      }
      auto mapped_data = std::exchange(pending_substr_[first_index], "");
      bytes_pending_ -= mapped_data.length();
      pending_substr_.erase(first_index);
    }

    std::erase_if(pending_substr_, [&](const auto &node)
                  {
      if(node.first >= first_index && node.first + node.second.length() - 1 <= final_index)
      {
        bytes_pending_ -= node.second.length();
        return true;
      }
      return false; });

    for (const auto &[idx, str] : pending_substr_)
    {
      if (first_index >= idx && final_index <= idx + str.length() - 1)
      {
        return;
      }
    }

    bytes_pending_ += data.length();
    pending_substr_.emplace(first_index, std::move(data));

    auto begin_node = pending_substr_.lower_bound(first_index);
    auto end_node = pending_substr_.upper_bound(final_index);
    if (begin_node != std::begin(pending_substr_))
    {
      begin_node = std::prev(begin_node);
    }
    for (auto node = begin_node; std::next(node) != end_node; ++node)
    {
      auto next_node = std::next(node);
      auto this_final_index = node->first + node->second.length() - 1;
      auto next_final_index = next_node->first;
      if (this_final_index >= next_final_index) //[[_UNLIKELY]]
      {
        auto len = this_final_index - next_final_index + 1;
        bytes_pending_ -= len;
        node->second.erase(node->second.begin() + node->second.length() - len, node->second.end());
      }
    }
  }
}

uint64_t Reassembler::truncate_head(uint64_t old_index, std::string &data)
{
  data.erase(0, next_index_ - old_index);
  return next_index_;
}

// How many bytes are stored in the Reassembler itself?
// This function is for testing only; don't add extra state to support it.
uint64_t Reassembler::count_bytes_pending() const
{
  return {};
}

需要进一步理清函数、变量之间的关系,理解一下思路。

拼尽全力,无法理解...

看看B站有没有好点的思路讲解教程,rose 大佬的思路太难懂了...而且代码量巨大。

exchange()方法

  • 简洁:它用一行代码完成了“保存旧值 + 更新新值”的操作,比手动写代码更简洁。

  • 线程安全:td::exchange是原子操作,适合在多线程环境中使用。

  • 避免临时变量:手动交换值时,通常需要一个临时变量,而std::exchange内部已经处理好了

真的是炫技了

用了太多看不懂的东西了,总的来说都是封装再封装了,而我一开始学的基本都是99标准之类的...对于这些封装的东西不是很懂,而且不知道为什么,封装了之后,好像性能还更好了。

想放弃了

这个作业真的太难了!对于我来说,虽然我自己知道,能把这个骨头啃下来,对我的提升绝对质的飞跃...

但是

  1. 由于是计算机网络相关的实验,和网络的干系绝对大...

  2. 加上我的基础其实根本不牢固...我觉得还是要先刷一遍视频,然后再来补

  3. 加上这几天 图灵院 一直在筹备项目,过两天就要开始立项了...

  4. 然后发现这个CSDIY的LAB还是比较吃电脑配置的...有点想装双系统或者买一个配置linux的...

  5. 现在目前的计划还是 视频照样刷...然后 实验在 Git 找了一个大佬的实现 2024 winter 的代码

  6. 总的来说就是一句吧,干啊!

  7. 我还是比较喜欢短时间高强度的训练...虽然我觉得坚持不是坏事...但是计划拖得太久,就容易松懈...

自己还是太菜了,加上吃网络,复制下来的代码都不能 100% 通过test,但是代码别人能够passed...所以我打算研读代码也就算了。学一些接口就好。

4. Development and debugging advice

  1. 编译代码后,可以通过运行cmake --build build --target check1来测试代码。

  2. 遵循现代 C++ 风格,可能会使用到 move() 来传递无法被复制的对象。

  3. 剩下的就是提交代码吧。C++

Last updated