我想构建一个多生产者多消费者(MPMC)通道,其中包含不同的并发任务处理和生成数据。其中一些任务负责与文件系统或网络进行交互。
两个例子:
PrintOutput(String)
将由记录器、控制台输出或 GUI 使用。NewJson(String)
将由记录器或解析器使用。
为了实现这一点,我选择chan
了 MPMC 通道提供者和tokio
系统来管理通道上每个侦听器的事件循环。
在阅读了tokio 网站上的示例后,我开始实施futures::stream::Stream
for chan::Receiver
。这将允许使用 a for each future 在频道上收听。但是,这两个库的文档突出了一个冲突:
fn poll(&mut self) -> Poll<Option<Self::Item>, Self::Error>
尝试提取此流的下一个值,如果流完成则返回 None。
这个方法和 Future::poll 一样,是从流中提取值的唯一方法。此方法还必须通常在任务的上下文中运行,并且此 trait 的实现者必须确保此方法的实现不会阻塞,因为它可能会导致消费者表现不佳。
fn recv(&self) -> Option<T>
在此通道上接收一个值。
如果这是一个异步通道,recv 仅在缓冲区为空时阻塞。
如果这是一个同步通道,recv 仅在缓冲区为空时阻塞。
如果这是一个集合通道,recv 会阻塞,直到相应的发送发送一个值。
对于所有通道,如果通道关闭且缓冲区为空,则 recv 始终立即返回 None。(如果缓冲区在关闭的通道上非空,则返回缓冲区中的值。)
保证以与发送它们相同的顺序接收值。
这个操作永远不会恐慌!但如果通道从不关闭,它可能会死锁。
chan::Receiver
当缓冲区为空时可能会阻塞,但futures::stream::Stream
希望在轮询时永远不会阻塞。
如果空缓冲区阻塞,则没有明确的方法来确认它是空的。如何检查缓冲区是否为空以防止阻塞?
虽然Kabuki在我的雷达上并且似乎是最成熟的演员模型板条箱,但它几乎完全缺乏文档。
到目前为止,这是我的实现:
extern crate chan;
extern crate futures;
struct RX<T>(chan::Receiver<T>);
impl<T> futures::stream::Stream for RX<T> {
type Item = T;
type Error = Box<std::error::Error>;
fn poll(&mut self) -> futures::Poll<Option<Self::Item>, Self::Error> {
let &mut RX(ref receiver) = self;
let item = receiver.recv();
match item {
Some(value) => Ok(futures::Async::Ready(Some(value))),
None => Ok(futures::Async::NotReady),
}
}
}
我已经完成了一个快速测试,看看它是如何工作的。看起来不错,但正如预期的那样,在完成缓冲区后会阻塞。虽然这应该可行,但我有点担心消费者“行为不端”意味着什么。现在我将继续测试这种方法,希望我不会遇到不良行为。
extern crate chan;
extern crate futures;
use futures::{Stream, Future};
fn my_test() {
let mut core = tokio_core::reactor::Core::new().unwrap();
let handle = core.handle();
let (tx, rx) = chan::async::<String>();
tx.send("Hello".to_string()); // fill the buffer before it blocks; single thread here.
let incoming = RX(rx).for_each(|s| {
println!("Result: {}", s);
Ok(())
});
core.run(incoming).unwrap()
}