我正在实现一个连接多路复用器 - 类,它包装单个连接以提供在其上创建所谓的Stream
-s 的能力。在一个物理连接上可以有几十个这样的流。
通过该连接发送的消息由协议定义,可以是服务消息(拥塞控制等),客户端永远不会看到这些消息,也可以是数据消息 - 它们包含流的一些数据,其中一个 - 在标头中定义的相应消息。
我在read
为Stream
. 它必须是阻塞的,但异步的,以便它返回一些值 - 数据读取或发生错误 - 但请求本身必须是某种异步队列。
为了实现异步网络 IO,我们使用了 Boost 的async_read
-s、async_write
-s 等和完成令牌,这些令牌取自另一个库。因此,按照我之前描述的术语,调用MyConnection::underlying_connection::read(size_t)
已经是异步的。
我实现的一种解决方案是 function MyConnection::processFrame()
,它从连接中读取,处理消息,如果是数据消息,则将数据放入相应流的缓冲区中。该函数while
将由流的read
. 但是,在这种情况下,可以同时调用多个async_read
UB,即 UB。此外,这意味着即使是服务消息也要等到某个流想要读取数据,这也是不合适的。
我想出的另一个解决方案是使用future
-s,但是当我检查时,他们的方法wait/get
会阻塞整个线程(即使使用延迟策略或配对承诺),这也必须避免。
下面是一个简化的示例,其中仅包含理解问题所需的方法。这是当前的实现,其中包含错误。
struct LowLevelConnection {
/// completion token of 3-rd part library - ufibers
yield_t yield;
/// boost::asio socket
TcpSocket socket_;
/// completely async (in one thread) method
std::vector<uint8_t> read(size_t bytes) {
std::vector<uint8_t> res;
res.reserve(bytes);
boost::asio::async_read(socket_, res, yield);
return res;
}
}
struct MyConnection {
/// header is always of that length
constexpr uint32_t kHeaderSize = 12;
/// underlying connection
LowLevelConnection connection_;
/// is running all the time the connection is up
void readLoop() {
while (connection_.isActive()) {
auto msg = connection_.read(kHeaderSize);
if (msg.type == SERVICE) { handleService(msg); return; }
// this is data message; read another part of it
auto data = connection_.read(msg.data_size);
// put the data into the stream's buffer
streams_.find(data.stream_id).buffer.put(data);
}
}
}
struct Stream {
Buffer buffer;
// also async blocking method
std::vector<uint8_t> read(uint32_t bytes) {
// in perfect scenario, this should look like this
async_wait([]() { return buffer.size() >= bytes; });
// return the subbuffer of 'bytes' size and remove them
return subbufer...
}
}
感谢您未来的答案!