5

我想利用 Tokio 的运行时来处理可变数量的异步期货。由于期货的数量在编译时是未知的,因此FuturesUnordered似乎是我最好的选择(宏,例如select!需要在编译时指定您的分支;join_all可能是可能的,但是当订单没有时,文档“在很多情况下”推荐 FuturesUnordered没关系)。

这个片段的逻辑是一个 recv() 循环被推送到期货桶中,它应该总是运行。当新数据到达时,它的解析/处理也被推送到期货桶(而不是立即处理)。这确保了接收器在响应新事件时保持低延迟,并且数据处理(可能计算昂贵的解密)与所有其他数据处理异步块(加上侦听接收器)同时发生。

顺便说一下,这个线程解释了为什么期货得到.boxed()

问题是这个神秘的错误:

错误[E0277] :`dyn futures::Future<Output = ()> + std::marker::Send` 不能在线程之间安全共享
  --> src/main.rs:27:8
    | 
27  |     }).boxed());
   |        ^^^^^  `dyn futures::Future<Output = ()> + std::marker::Send` 不能在线程之间安全共享
   | 
   = help : `dyn futures::Future<Output = ()> + std::marker::Send` 没有实现特征 `Sync`
    =注意:因为对 `Sync` 的 impl 有要求,所以需要唯一<dyn futures::Future<Output = ()> + std::marker::Send>`
    = note: 必需,因为它出现在类型 `Box<dyn futures::Future<Output = ()> + std::marker::Send>`
    =注意:必需因为它出现在类型 `Pin<Box<dyn futures: :Future<Output = ()> + std::marker::Send>>`
    =注意:由于对 `FuturesUnordered<Pin<Box<dyn futures::Future<Output = ()> + std::marker::Send>>>`
    =注意:因为对 `&FuturesUnordered<Pin<Box<dyn futures::Future< 的 `std::marker::Send` 的 impl 有要求输出 = ()> + std::marker::Send>>>`
    =注意:
   需要,因为它出现在类型 `[static generator@src/main.rs:16:25: 27:6 _]` = note:必需,因为它出现在类型 `from_generator::GenFuture<[static generator@src/main.rs:16:25: 27:6 _]>`
    =注意:必需因为它出现在类型 `impl futures::未来`

看起来像“递归地”推送到 UnorderedFutures (我猜不是真的,但你还能怎么称呼它?)不起作用,但我不知道为什么。此错误表明SyncBox'd & Pin'd 异步块的某些特征要求不符合FuturesUnordered—— 我猜这个要求只是强加的,因为&FuturesUnordered(在此期间使用,futures.push(...)因为该方法借用了 &self)需要它的Send特征... 或者其他的东西?

use std::error::Error;
use tokio::sync::mpsc::{self, Receiver, Sender};
use futures::stream::futures_unordered::FuturesUnordered;
use futures::FutureExt;

#[tokio::main]
pub async fn main() -> Result<(), Box<dyn Error>> {
    let mut futures = FuturesUnordered::new();
    let (tx, rx) = mpsc::channel(32);
    
    tokio::spawn( foo(tx) );    // Only the receiver is relevant; its transmitter is
                                // elsewhere, occasionally sending data.
    futures.push((async {                               // <--- NOTE: futures.push()
        loop {
            match rx.recv().await {
                Some(data) => {
                    futures.push((async move {          // <--- NOTE: nested futures.push()
                        let _ = data; // TODO: replace with code that processes 'data'
                    }).boxed());
                },
                None => {}
            }
        }
    }).boxed());
    
    while let Some(_) = futures.next().await {}

    Ok(())
}
4

2 回答 2

5

我将把低级错误留给另一个答案,但我相信在这里解决高级问题的一种更惯用的方法是将使用FuturesUnordered与以下内容结合起来tokio::select!

use tokio::sync::mpsc;
use futures::stream::FuturesUnordered;
use futures::StreamExt;

#[tokio::main]
pub async fn main() {
    let mut futures = FuturesUnordered::new();
    let (tx, mut rx) = mpsc::channel(32);
    
    //turn foo into something more concrete
    tokio::spawn(async move {
        let _ = tx.send(42i32).await;
    });

    loop {
        tokio::select! {
            Some(data) = rx.recv() => {
                futures.push(async move {
                    data.to_string()
                });
            },
            Some(result) = futures.next() => {
                println!("{}", result)
            },
            else => break,
        }
    }
}

您可以在此处阅读有关 select 宏的更多信息:https ://tokio.rs/tokio/tutorial/select

于 2021-05-01T19:04:01.897 回答
2

当您使用该方法将 async 块创建的未来装箱时boxed,您正试图将其强制为dyn Future + Send

pub fn boxed<'a>(
    self
) -> Pin<Box<dyn Future<Output = Self::Output> + 'a + Send>>

然而,创造的未来不是Send。为什么?FuturesUnordered因为在其中,您尝试推送到借用它的 :

pub fn push(&self, future: Fut)

这意味着该async块捕获一个&FuturesUnordered. 对于要成为的类型Send,它的所有字段都必须是Send,所以对于生成的未来Send&FuturesUnordered必须是Send

要成为引用Send,类型还必须是Sync

impl<'_, T> Send for &'_ T where
    T: Sync

为了FuturesUnordered成为Sync,存储的期货也必须是Sync

impl<Fut: Sync> Sync for FuturesUnordered<Fut> {}

但是,返回的未来boxed不一定是Sync

pub fn boxed<'a>(
    self
) -> Pin<Box<dyn Future<Output = Self::Output> + 'a + Send>>

这意味着异步生成器不是Send,因此您不能将其强制为 a dyn Future + Send,并且您会收到一条令人困惑的错误消息。

Sync解决方案是向未来添加绑定,并Box::pin手动:

type BoxedFuture = Pin<Box<dyn Future<Output = ()> + Send + Sync>>;

let mut futures = FuturesUnordered::<BoxedFuture>::new();

futures.push(Box::pin(async {
    loop {
        match rx.recv().await {
            Some(data) => {
                futures.push(Box::pin(async move {
                    let _ = data;
                }));
            }
            None => {}
        }
    }
}));

但是,您将遇到一堆借款问题。正如迈克尔的回答所解释的那样,更好的解决方案是使用tokio::select!而不是外部。push

于 2021-05-01T19:40:22.257 回答