我想利用 Tokio 的运行时来处理可变数量的异步期货。由于期货的数量在编译时是未知的,因此FuturesUnordered似乎是我最好的选择(宏,例如select!
需要在编译时指定您的分支;join_all可能是可能的,但是当订单没有时,文档“在很多情况下”推荐 FuturesUnordered没关系)。
这个片段的逻辑是一个 recv() 循环被推送到期货桶中,它应该总是运行。当新数据到达时,它的解析/处理也被推送到期货桶(而不是立即处理)。这确保了接收器在响应新事件时保持低延迟,并且数据处理(可能计算昂贵的解密)与所有其他数据处理异步块(加上侦听接收器)同时发生。
顺便说一下,这个线程解释了为什么期货得到.boxed()
。
问题是这个神秘的错误:
错误[E0277] :`dyn futures::Future<Output = ()> + std::marker::Send` 不能在线程之间安全共享 --> src/main.rs:27:8 | 27 | }).boxed()); | ^^^^^ `dyn futures::Future<Output = ()> + std::marker::Send` 不能在线程之间安全共享 | = help : `dyn futures::Future<Output = ()> + std::marker::Send` 没有实现特征 `Sync` =注意:因为对 `Sync` 的 impl 有要求,所以需要唯一<dyn futures::Future<Output = ()> + std::marker::Send>` = note: 必需,因为它出现在类型 `Box<dyn futures::Future<Output = ()> + std::marker::Send>` =注意:必需因为它出现在类型 `Pin<Box<dyn futures: :Future<Output = ()> + std::marker::Send>>` =注意:由于对 `FuturesUnordered<Pin<Box<dyn futures::Future<Output = ()> + std::marker::Send>>>` =注意:因为对 `&FuturesUnordered<Pin<Box<dyn futures::Future< 的 `std::marker::Send` 的 impl 有要求输出 = ()> + std::marker::Send>>>` =注意: 需要,因为它出现在类型 `[static generator@src/main.rs:16:25: 27:6 _]` = note:必需,因为它出现在类型 `from_generator::GenFuture<[static generator@src/main.rs:16:25: 27:6 _]>` =注意:必需因为它出现在类型 `impl futures::未来`
看起来像“递归地”推送到 UnorderedFutures (我猜不是真的,但你还能怎么称呼它?)不起作用,但我不知道为什么。此错误表明Sync
Box'd & Pin'd 异步块的某些特征要求不符合FuturesUnordered
—— 我猜这个要求只是强加的,因为&FuturesUnordered
(在此期间使用,futures.push(...)
因为该方法借用了 &self)需要它的Send
特征... 或者其他的东西?
use std::error::Error;
use tokio::sync::mpsc::{self, Receiver, Sender};
use futures::stream::futures_unordered::FuturesUnordered;
use futures::FutureExt;
#[tokio::main]
pub async fn main() -> Result<(), Box<dyn Error>> {
let mut futures = FuturesUnordered::new();
let (tx, rx) = mpsc::channel(32);
tokio::spawn( foo(tx) ); // Only the receiver is relevant; its transmitter is
// elsewhere, occasionally sending data.
futures.push((async { // <--- NOTE: futures.push()
loop {
match rx.recv().await {
Some(data) => {
futures.push((async move { // <--- NOTE: nested futures.push()
let _ = data; // TODO: replace with code that processes 'data'
}).boxed());
},
None => {}
}
}
}).boxed());
while let Some(_) = futures.next().await {}
Ok(())
}