1

我正在尝试制作一个在连续失败Stream时消耗并截断它的函数。max_consecutive_fails然而,事情并不顺利(E0495)。我将Streams 更改为Iterators (并删除了asyncs),它很有效。为什么会这样?我怎样才能重构这段代码(工作)?

use futures::stream::Stream;
pub fn max_fail<'a, T>(stream : impl Stream<Item = Option<T>> +'a , max_consecutive_fails: usize) -> impl Stream +'a where T : 'a
{
    use futures::stream::StreamExt;
    let mut consecutive_fails = 0;
    stream.take_while(move |x| async {
        if x.is_some(){
            consecutive_fails = 0;
            true
        }
        else{
            consecutive_fails += 1;
            consecutive_fails != max_consecutive_fails
        }
    })
}

下面是我试图指出问题所在的最小化示例,但我仍然无法理解 rustc 错误消息。

use futures::stream::Stream;
pub fn minified_example<'a>(stream: impl Stream<Item = bool> + 'a) -> impl Stream + 'a
{
    use futures::stream::StreamExt;
    stream.take_while( |x| async { *x })
}
4

2 回答 2

2

异步块 ( async { ... }) 在捕获环境的方式上类似于闭包。默认情况下,每次使用来自另一个范围的变量都是通过引用,这意味着impl core::future::Future块创建的变量不能比它捕获的变量寿命长。

您需要使用(就像使用闭包一样)x进入块async move { ... }

于 2020-05-03T17:52:14.573 回答
0

所以Future捕获变量,编译器不够聪明,无法删除不必要的捕获,应该做的是用单独的异步块显式解开捕获。

use futures::stream::Stream;
pub fn max_fail<'a, T>(
    stream: impl Stream<Item = Option<T>> + 'a,
    max_consecutive_fails: usize,
) -> impl Stream + 'a
where
    T: 'a,
{
    use futures::stream::StreamExt;
    let mut consecutive_fails = 0;
    stream.take_while(move |x| {
        let t = if x.is_some() {
            consecutive_fails = 0;
            true
        } else {
            consecutive_fails += 1;
            consecutive_fails != max_consecutive_fails
        };
        return async move { t };
    })
}
于 2020-05-04T00:45:23.287 回答