1

我想构建一个多生产者多消费者(MPMC)通道,其中包含不同的并发任务处理和生成数据。其中一些任务负责与文件系统或网络进行交互。

两个例子:

  • PrintOutput(String)将由记录器、控制台输出或 GUI 使用。

  • NewJson(String)将由记录器或解析器使用。

为了实现这一点,我选择chan了 MPMC 通道提供者和tokio系统来管理通道上每个侦听器的事件循环。

在阅读了tokio 网站上的示例后,我开始实施futures::stream::Streamfor chan::Receiver。这将允许使用 a for each future 在频道上收听。但是,这两个库的文档突出了一个冲突:

fn poll(&mut self) -> Poll<Option<Self::Item>, Self::Error>

尝试提取此流的下一个值,如果流完成则返回 None。

这个方法和 Future::poll 一样,是从流中提取值的唯一方法。此方法还必须通常在任务的上下文中运行,并且此 trait 的实现者必须确保此方法的实现不会阻塞,因为它可能会导致消费者表现不佳。

fn recv(&self) -> Option<T>

在此通道上接收一个值。

如果这是一个异步通道,recv 仅在缓冲区为空时阻塞。

如果这是一个同步通道,recv 仅在缓冲区为空时阻塞。

如果这是一个集合通道,recv 会阻塞,直到相应的发送发送一个值。

对于所有通道,如果通道关闭且缓冲区为空,则 recv 始终立即返回 None。(如果缓冲区在关闭的通道上非空,则返回缓冲区中的值。)

保证以与发送它们相同的顺序接收值。

这个操作永远不会恐慌!但如果通道从不关闭,它可能会死锁。

chan::Receiver当缓冲区为空时可能会阻塞,但futures::stream::Stream希望在轮询时永远不会阻塞。

如果空缓冲区阻塞,则没有明确的方法来确认它是空的。如何检查缓冲区是否为空以防止阻塞?

虽然Kabuki在我的雷达上并且似乎是最成熟的演员模型板条箱,但它几乎完全缺乏文档。


到目前为止,这是我的实现:

extern crate chan;
extern crate futures;

struct RX<T>(chan::Receiver<T>);

impl<T> futures::stream::Stream for RX<T> {
    type Item = T;
    type Error = Box<std::error::Error>;

    fn poll(&mut self) -> futures::Poll<Option<Self::Item>, Self::Error> {
        let &mut RX(ref receiver) = self;
        let item = receiver.recv();

        match item {
            Some(value) => Ok(futures::Async::Ready(Some(value))),
            None => Ok(futures::Async::NotReady),
        }
    }
}

我已经完成了一个快速测试,看看它是如何工作的。看起来不错,但正如预期的那样,在完成缓冲区后会阻塞。虽然这应该可行,但我有点担心消费者“行为不端”意味着什么。现在我将继续测试这种方法,希望我不会遇到不良行为。

extern crate chan;
extern crate futures;
use futures::{Stream, Future};

fn my_test() {
    let mut core = tokio_core::reactor::Core::new().unwrap();
    let handle = core.handle();

    let (tx, rx) = chan::async::<String>();

    tx.send("Hello".to_string()); // fill the buffer before it blocks; single thread here.

    let incoming = RX(rx).for_each(|s| {
        println!("Result: {}", s);

        Ok(())
    });

    core.run(incoming).unwrap()
}
4

1 回答 1

1

chancrate 提供了一个chan_select允许非阻塞的宏recv;但是要实现Future这些原语,您还需要在通道准备好时唤醒任务(请参阅 参考资料futures::task::current())。

您可以Future使用现有的原语来实现;实施新的通常更困难。在这种情况下,您可能必须分叉chan以使其Future兼容。

似乎multiqueuecrate 有一个Future兼容的 mpmc 频道mpmc_fut_queue

于 2017-10-22T12:45:10.163 回答