8

想象一些futures存储在一个Vec长度由runtime确定的a中,你应该同时加入这些futures,你应该怎么做?

显然,通过文档中的示例tokio::join,手动指定可能的每个长度Vec,例如 1、2、3,......并且处理可敬的情况应该有效。

extern crate tokio;

let v = Vec::new();
v.push(future_1);

// directly or indirectly you push many futures to the vector
 
v.push(future_N);

// to join these futures concurrently one possible way is 

if v.len() == 0 {}
if v.len() == 1 { join!(v.pop()); }
if v.len() == 2 { join!(v.pop(), v.pop() ); }
// ...

而且我还注意到 tokio::join! 当我使用类似的语法时,将列表作为文档中的参数

tokio::join!(v);

或类似的东西

tokio::join![ v ] /  tokio::join![ v[..] ] / tokio::join![ v[..][..] ]

它只是不起作用

问题来了,是否有任何途径可以更有效地加入这些期货,或者我是否应该错过文件所说的内容?

4

2 回答 2

14

您可以使用futures::future::join_all将您的期货集合“合并”成一个单一的未来,当所有的子期货都解决时,它就会解决。

于 2020-08-26T04:10:02.960 回答
3

join_alltry_join_all,以及来自同一个 crate 的更多功能FuturesOrdered和实用程序,作为单个任务执行。如果组成的期货不经常同时准备好执行工作,这可能很好,但是如果您想利用多线程运行时的 CPU 并行性,请考虑将各个期货作为单独的任务生成并等待连接句柄:FuturesUnorderedfutures

use futures::future;

// ...

let outputs = future::try_join_all(v.into_iter().map(tokio::spawn)).await?;

您还可以使用FuturesOrderedandFuturesUnordered组合子在流中异步处理输出:

use futures::stream::FuturesUnordered;
use futures::prelude::*;

// ...

let mut completion_stream = v.into_iter()
    .map(tokio::spawn)
    .collect::<FuturesUnordered<_>>();
while let Some(res) = completion_stream.next().await {
    // ...    
}

使用任务的一个警告是,当产生任务并可能拥有返回的未来(例如异步块)JoinHandle被丢弃时,它们不会被取消。该JoinHandle::abort方法需要用于显式取消任务。

于 2021-10-03T11:51:08.110 回答