这里回顾CS 144 Lab 1: stitching substrings into a byte stream。

实验资料:

参考资料:

Lab 1: stitching substrings into a byte stream

在该实验中:我们将实现一个流重组器——一个将字节流的小片段(称为子串,或段)按正确的顺序缝合成连续的字节流的模块。

准备工作

下载代码以及跑通流程

git clone https://gitee.com/kangyupl/sponge
git checkout -b lab1-startercode origin/lab1-startercode
mkdir build && cd build
cmake ..
make format
make -j4 && make check_lab1 

接口

// Construct a `StreamReassembler` that will store up to `capacity` bytes.
StreamReassembler(const size_t capacity);
// Receive a substring and write any newly contiguous bytes into the stream.
//
// `data`: the segment
// `index` indicates the index (place in sequence) of the first byte in `data`
// `eof`: the last byte of this segment is the last byte in the entire stream
void push_substring(const string &data, const uint64_t index, const bool eof);
// Access the reassembled byte stream
ByteStream &stream_out();
// The number of bytes in the substrings stored but not yet reassembled
size_t unassembled_bytes() const;
// Is the internal state empty (other than the output stream)?
bool empty() const;

说明

  • 整个数据流中第一个字节的索引是什么?
    • 0。
  • 我的实现应该有多大的效率?
    • 我们还不打算指定一个效率的概念,但请不要建立一个严重影响空间或时间的数据结构——这个数据结构将是你的TCP实现的基础。
  • 应该如何处理不一致的子串?
    • 你可以假设它们不存在。也就是说,你可以假设有一个唯一的底层字节流,而所有的子串都是它的(精确)片段。
  • 我可以使用什么?
    • 你可以使用你认为有用的标准库的任何部分。特别是,我们希望你至少要使用一个数据结构。
  • 字节什么时候应该被写入流中?
    • 越快越好。一个字节不应该出现在流中的唯一情况是,在它之前有一个字节还没有被”push”。
  • 子串可能重叠吗?
    • 可能。
  • 我是否需要向StreamReassembler添加私有成员?
    • 是的。由于段可能以任何顺序到达,你的数据结构将不得不记住子串,直到它们准备好被放入流中,也就是说,直到它们之前的所有索引都已填充。

代码

说明:

  • 使用双向链表作为缓存区,链表中的元素根据index递增的顺序排列;
  • 任意两个缓存块之间没有重合;

代码:

stream_reassembler.hh:

#ifndef SPONGE_LIBSPONGE_STREAM_REASSEMBLER_HH
#define SPONGE_LIBSPONGE_STREAM_REASSEMBLER_HH

#include "byte_stream.hh"

#include <cstdint>
#include <string>
#include <vector>
#include <list>
#include <utility> 

//! \brief A class that assembles a series of excerpts from a byte stream (possibly out of order,
//! possibly overlapping) into an in-order byte stream.
/**
 * Buffer数据结构, 用来缓存已到达, 但没有被push的部分, 
 * 每个部分为(string, index), 利用list存储, 按照index递增的顺序存储。
 */
class Buffer {
  private:
    std::list<std::pair<std::string, uint64_t>> buf = {};
    size_t size = 0;
  public:
    // 获得缓存大小
    size_t get_size() const;
    // 插入
    void insert(const std::string &data, const uint64_t index);
    // 合并list中重叠部分
    void merge();
    // 返回index最小的元素
    std::pair<std::string, uint64_t> front();
    // 弹出元素, 如果flag为true, 则更新大小
    void pop(bool flag);
    // 判断buffer是否为空
    bool is_empty();
    // 打印
    void print_buffer();
};

class StreamReassembler {
  private:
    // Your code here -- add private members as necessary.
    Buffer buf;
    // 是否为eof
    bool eof_flag = false;
    // 下一个被push的字符索引
    uint64_t _index = 0;
    // 最后一个被push的字符
    std::string last_str = "";
    ByteStream _output;  //!< The reassembled in-order byte stream
    size_t _capacity;    //!< The maximum number of bytes

  public:
    //! \brief Construct a `StreamReassembler` that will store up to `capacity` bytes.
    //! \note This capacity limits both the bytes that have been reassembled,
    //! and those that have not yet been reassembled.
    StreamReassembler(const size_t capacity);

    //! \brief Receives a substring and writes any newly contiguous bytes into the stream.
    //!
    //! If accepting all the data would overflow the `capacity` of this
    //! `StreamReassembler`, then only the part of the data that fits will be
    //! accepted. If the substring is only partially accepted, then the `eof`
    //! will be disregarded.
    //!
    //! \param data the string being added
    //! \param index the index of the first byte in `data`
    //! \param eof whether or not this segment ends with the end of the stream
    void push_substring(const std::string &data, const uint64_t index, const bool eof);

    bool push_substring(const std::string &data, const uint64_t index);

    //! \name Access the reassembled byte stream
    //!@{
    const ByteStream &stream_out() const { return _output; }
    ByteStream &stream_out() { return _output; }
    //!@}

    Buffer &buffer_out() { return buf; }

    //! The number of bytes in the substrings stored but not yet reassembled
    //!
    //! \note If the byte at a particular index has been submitted twice, it
    //! should only be counted once for the purpose of this function.
    // 缓存的字符串数量
    size_t unassembled_bytes() const;

    //! \brief Is the internal state empty (other than the output stream)?
    //! \returns `true` if no substrings are waiting to be assembled
    bool empty() const;

    int get_index() { return _index; };
};

#endif  // SPONGE_LIBSPONGE_STREAM_REASSEMBLER_HH

stream_reassembler.cc:

#include "stream_reassembler.hh"

#include <iostream>
// Dummy implementation of a stream reassembler.

// For Lab 1, please replace with a real implementation that passes the
// automated checks run by `make check_lab1`.

// You will need to add private members to the class declaration in `stream_reassembler.hh`

template <typename... Targs>
void DUMMY_CODE(Targs &&... /* unused */) {}

using namespace std;

// s1, s2有重叠, 则将其拼接, 存储至str, l为公共长度
static void get_inter(int &l,
                      const int l1,
                      const int l2,
                      const std::string &s1,
                      const std::string &s2,
                      std::string &str) {
    // 查看子串是否有重叠
    int len = min(l1, l2);
    for (l = len; l >= 1; l--) {
        int i = l1 - l;
        int j;
        // 比较子串是否相等
        for (j = 0; j < l; i++, j++) {
            if (s1[i] != s2[j]) {
                break;
            }
        }

        // j = l表示没有重叠
        if (j == l) {
            break;
        }
    }

    str = s1 + s2.substr(l2 - l);
}

StreamReassembler::StreamReassembler(const size_t capacity) : buf(), _output(capacity), _capacity(capacity) {}

//! \details This function accepts a substring (aka a segment) of bytes,
//! possibly out-of-order, from the logical stream, and assembles any newly
//! contiguous substrings and writes them into the output stream in order.
// push字符串, 要求为index <= _index < index + data.size(), 此处不考虑缓存的问题
bool StreamReassembler::push_substring(const std::string &data, const uint64_t index) {
    // 有重叠
    int l1 = last_str.size();
    int l2 = data.size();
    // 忽略空字符
    if (l2 == 0) {
        return false;
    }
    // 新数据和旧数据没有重合
    if (index > _index) {
        return false;
    }
    // 新数据包含在之前的数据中
    if (_index >= index + l2) {
        return false;
    }
    // 特殊情况, 第一次insert或者相邻
    if ((l1 == 0) || (_index == index)) {
        // 拼接
        last_str += data;
        _index += l2;
        std::string new_data = data;
        _output.write(new_data);
        return true;
    }

    int l;
    // 一般情形, 此时一定有重叠
    // ____
    //   _____
    // 找到重叠位置
    std::string res_data;
    get_inter(l, l1, l2, last_str, data, res_data);
    // 重叠的后续部分
    std::string new_data = data.substr(l, l2 - l);
    // 更新last_str和_index
    last_str += new_data;
    _index += l2 - l;
    // push
    _output.write(new_data);

    return true;
}

// push字符串, 此处考虑缓存的问题
void StreamReassembler::push_substring(const string &data, const size_t index, const bool eof) {
    if (_index >= index) {
        // 如果有交集, 则push
        // ____
        //   _____
        push_substring(data, index);
    } else {
        // 如果没有交集, 则缓存到buffer中
        // ____
        //      _____
        buf.insert(data, index);
        buf.merge();
    }

    // 此时buf单调递增, 没有交集
    while (!buf.is_empty()) {
        auto pair = buf.front();
        std::string data1 = pair.first;
        uint64_t index1 = pair.second;
        // 还有数据未到达
        // ____
        //      _____
        if (index1 > _index) {
            break;
        }
        // 否则push string
        push_substring(data1, index1);
        // 弹出字符, 并更新大小
        buf.pop(true);
    }

    // 记录eof是否到达
    if (eof) {
        eof_flag = true;
    }
    // eof并且没有待处理, 则结束
    if (eof_flag && empty()) {
        _output.end_input();
    }
}

// 缓存的字符串数量
size_t StreamReassembler::unassembled_bytes() const { return buf.get_size(); }

// 缓存是否为空
bool StreamReassembler::empty() const { return unassembled_bytes() == 0; }

// Buffer
// 获得buffer的大小
size_t Buffer::get_size() const { return size; }

// 插入
void Buffer::insert(const std::string &data, const uint64_t index) {
    // 数据为空则直接返回
    if (data.size() == 0) {
        return;
    }

    if (buf.size() == 0) {
        // buf为空则直接插入
        buf.push_back(std::make_pair(data, index));
        size += data.size();
        return;
    } else {
        // 否则找到第一个it->second >= index的位置
        auto it = buf.begin();
        for (; it != buf.end(); it++) {
            if (it->second >= index) {
                break;
            }
        }

        if ((it == buf.end()) || (it->second > index)) {
            /**
             * 如果不存在, 则直接插入即可, 此时it->second < index;
             * 如果it->second > index, 则直接插入, 这种情况对应于有间隔;
             */
            buf.insert(it, std::make_pair(data, index));
            size += data.size();
            return;
        } else {
            // 如果相等则比较大小, 保留长度较大的部分
            int d1 = data.size();
            int d2 = (it->first).size();
            if (d1 > d2) {
                size -= d2;
                // 先删除后插入
                auto it1 = it;
                it1++;
                buf.erase(it);
                buf.insert(it1, std::make_pair(data, index));
                // 后插入
                size += d1;
            }
        }
    }
}

// 返回index最小的元素
std::pair<std::string, uint64_t> Buffer::front() { return *(buf.begin()); }

// 弹出元素, 如果flag为true, 则更新大小
void Buffer::pop(bool flag) {
    auto it = buf.begin();
    if (flag) {
        size -= (it->first).size();
    }
    buf.pop_front();
}

// 判断buffer是否为空
bool Buffer::is_empty() { return size == 0; }

// 合并list中重叠部分
void Buffer::merge() {
    auto it1 = buf.begin();
    auto it2 = it1;
    it2++;
    // buf大小
    int m = buf.size();

    while (it1 != buf.end()) {
        it2 = it1;
        it2++;
        // bool flag = false;
        while (it2 != buf.end()) {
            auto str1 = it1->first;
            auto str2 = it2->first;
            uint64_t l1 = str1.size();
            uint64_t l2 = str2.size();
            // 起始位置
            uint64_t s1 = it1->second;
            uint64_t s2 = it2->second;
            // 结束位置(不包括)
            uint64_t e1 = s1 + l1;
            uint64_t e2 = s2 + l2;
            // [s1, e1), [s2, e2)
            // 无交集
            if (s2 >= e1) {
                it1 = it2;
                break;
            }
            if (e2 <= e1) {
                // [s2, e2)在[s1, e1)内部
                size -= l2;
                auto it = it2;
                it2++;
                buf.erase(it);
                // break;
            } else {
                // 有交集
                // s1, s2, e1, e2
                // s1 ____ e1
                //    s2 _____ e2
                int l = e2 - e1;
                int s = l2 - l;
                // 更新字符串
                it1->first += str2.substr(s, l);
                // 更新大小
                size -= s;
                auto it = it2;
                it2++;
                buf.erase(it);
                // break;
            }
        }
        // 判断长度是否变化
        int m1 = buf.size();
        // 如果长度没有变化, 则更新it1
        if (m == m1) {
            it1 = it2;
        }
        m = m1;
    }
}

// 打印
void Buffer::print_buffer() {
    cout << "Print buffer start" << endl;
    for (auto it = buf.begin(); it != buf.end(); it++) {
        cout << it->second << " " << it->second + it->first.size() << endl;
    }
    cout << "Print buffer end" << endl;
}

测试:

make format
make -j4 && make check_lab1 

结果:

Test project /home/cs144/sponge/build
      Start 15: t_strm_reassem_single
 1/16 Test #15: t_strm_reassem_single ............   Passed    0.00 sec
      Start 16: t_strm_reassem_seq
 2/16 Test #16: t_strm_reassem_seq ...............   Passed    0.00 sec
      Start 17: t_strm_reassem_dup
 3/16 Test #17: t_strm_reassem_dup ...............   Passed    0.00 sec
      Start 18: t_strm_reassem_holes
 4/16 Test #18: t_strm_reassem_holes .............   Passed    0.00 sec
      Start 19: t_strm_reassem_many
 5/16 Test #19: t_strm_reassem_many ..............   Passed    0.24 sec
      Start 20: t_strm_reassem_overlapping
 6/16 Test #20: t_strm_reassem_overlapping .......   Passed    0.00 sec
      Start 21: t_strm_reassem_win
 7/16 Test #21: t_strm_reassem_win ...............   Passed    0.36 sec
      Start 22: t_byte_stream_construction
 8/16 Test #22: t_byte_stream_construction .......   Passed    0.00 sec
      Start 23: t_byte_stream_one_write
 9/16 Test #23: t_byte_stream_one_write ..........   Passed    0.00 sec
      Start 24: t_byte_stream_two_writes
10/16 Test #24: t_byte_stream_two_writes .........   Passed    0.00 sec
      Start 25: t_byte_stream_capacity
11/16 Test #25: t_byte_stream_capacity ...........   Passed    0.00 sec
      Start 26: t_byte_stream_many_writes
12/16 Test #26: t_byte_stream_many_writes ........   Passed    0.01 sec
      Start 27: t_webget
13/16 Test #27: t_webget .........................   Passed    0.45 sec
      Start 47: t_address_dt
14/16 Test #47: t_address_dt .....................   Passed    0.01 sec
      Start 48: t_parser_dt
15/16 Test #48: t_parser_dt ......................   Passed    0.00 sec
      Start 49: t_socket_dt
16/16 Test #49: t_socket_dt ......................   Passed    0.00 sec

100% tests passed, 0 tests failed out of 16

Total Test time (real) =   1.11 sec
[100%] Built target check_lab1