我正在阅读futures-preview
0.3 的来源,以了解如何正确地“通知任何人”。在mpsc::channel
(有界)中,多个发件人可能会等待收据(在缓冲区已满的情况下)。
查看 and 的实现next_message
,unpark_one
接收者似乎只通知一个发送者每一张收据。
我怀疑这是否适用于select!
,因为select!
可能会导致错误通知。但是,我无法提出问题案例。
这是我试图混淆的尝试mpsc
:
[package]
name = "futures-mpsc-test"
version = "0.1.0"
edition = "2018"
[dependencies]
futures-preview = { version = "0.3.0-alpha.9", features = ["tokio-compat"] }
tokio = "0.1.11"
和这个:
#![feature(async_await, await_macro, futures_api, pin)]
use std::collections::HashSet;
use futures::prelude::*;
use futures::channel::mpsc::{channel, Sender};
use futures::channel::oneshot;
use futures::select;
async fn main2() {
let channel_len = 1;
let num_false_wait = 1000;
let num_expected_messages = 100;
let (mut send, mut recv) = channel(channel_len);
// One extra capacity per sender. Fill the extras.
await!(send.send(-2)).unwrap();
// Fill buffers
for _ in 0..channel_len {
await!(send.send(-1)).unwrap();
}
// False waits. Should resolve and produce false waiters.
for _ in 0..num_false_wait {
await!(false_wait(&send));
}
// True messages.
{
let mut send = send.clone();
await!(send.send(-2)).unwrap();
tokio::spawn(async move {
for i in 0..num_expected_messages {
await!(send.send(i)).unwrap();
}
Ok(())
}.boxed().compat());
}
// Drain receiver until all true messages are received.
let mut expects = (0..num_expected_messages).collect::<HashSet<_>>();
while !expects.is_empty() {
let i = await!(recv.next()).unwrap();
expects.remove(&i);
eprintln!("Received: {}", i);
}
}
// If `send` is full, it will produce false waits.
async fn false_wait(send: &Sender<i32>) {
let (wait_send, wait_recv) = oneshot::channel();
let mut send = send.clone();
await!(send.send(-2)).unwrap();
tokio::spawn(async move {
let mut sending = send.send(-3);
let mut fallback = future::ready(());
select! {
sending => {
sending.unwrap();
},
fallback => {
eprintln!("future::ready is selected");
},
};
wait_send.send(()).unwrap();
Ok(())
}.boxed().compat());
await!(wait_recv).unwrap();
}
fn main() {
tokio::run(async {
await!(main2());
Ok(())
}.boxed().compat());
}
我希望这会发生:
- 缓冲区由 填充
-1
。因此,后来的发件人被阻止。 - 有“真服务员”和“假服务员”之分。假服务员已经退出,因为另一只手
select!
立刻完成。 - 在对 的每次调用中
await!(recv.next())
,最多通知一个等待的发件人。如果通知了虚假的服务员,即使缓冲区有空房间,也没有人可以推送到缓冲区。 - 如果所有元素都在没有真正通知的情况下耗尽,整个系统就会卡住。
尽管我的期望,main2
异步功能成功完成。为什么?