1

我正在实现一个连接多路复用器 - 类,它包装单个连接以提供在其上创建所谓的Stream-s 的能力。在一个物理连接上可以有几十个这样的流。

通过该连接发送的消息由协议定义,可以是服务消息(拥塞控制等),客户端永远不会看到这些消息,也可以是数据消息 - 它们包含流的一些数据,其中一个 - 在标头中定义的相应消息。

我在readStream. 它必须是阻塞的,但异步的,以便它返回一些值 - 数据读取或发生错误 - 但请求本身必须是某种异步队列。

为了实现异步网络 IO,我们使用了 Boost 的async_read-s、async_write-s 等和完成令牌,这些令牌取自另一个库。因此,按照我之前描述的术语,调用MyConnection::underlying_connection::read(size_t)已经是异步的。

我实现的一种解决方案是 function MyConnection::processFrame(),它从连接中读取,处理消息,如果是数据消息,则将数据放入相应流的缓冲区中。该函数while将由流的read. 但是,在这种情况下,可以同时调用多个async_readUB,即 UB。此外,这意味着即使是服务消息也要等到某个流想要读取数据,这也是不合适的。

我想出的另一个解决方案是使用future-s,但是当我检查时,他们的方法wait/get会阻塞整个线程(即使使用延迟策略或配对承诺),这也必须避免。

下面是一个简化的示例,其中仅包含理解问题所需的方法。这是当前的实现,其中包含错误。

struct LowLevelConnection {
  /// completion token of 3-rd part library - ufibers
  yield_t yield;

  /// boost::asio socket
  TcpSocket socket_;

  /// completely async (in one thread) method
  std::vector<uint8_t> read(size_t bytes) {
    std::vector<uint8_t> res;
    res.reserve(bytes);

    boost::asio::async_read(socket_, res, yield);

    return res;
  }
}

struct MyConnection {
  /// header is always of that length
  constexpr uint32_t kHeaderSize = 12; 

  /// underlying connection
  LowLevelConnection connection_;

  /// is running all the time the connection is up
  void readLoop() {
    while (connection_.isActive()) {
      auto msg = connection_.read(kHeaderSize);
      if (msg.type == SERVICE) { handleService(msg); return; }

      // this is data message; read another part of it
      auto data = connection_.read(msg.data_size);

      // put the data into the stream's buffer
      streams_.find(data.stream_id).buffer.put(data);
    }
  }
}

struct Stream {
  Buffer buffer;

  // also async blocking method
  std::vector<uint8_t> read(uint32_t bytes) {
    // in perfect scenario, this should look like this
    async_wait([]() { return buffer.size() >= bytes; });

    // return the subbuffer of 'bytes' size and remove them
    return subbufer...
  }
}

感谢您未来的答案!

4

0 回答 0