8

我正在使用 Tokio 1.1 来做异步事情。我有一个async mainwith#[tokio::main]所以我已经在运行一个运行时。

main调用我希望在未来使用的非异步方法await(具体来说,我正在从数据融合数据帧中收集)。这种非异步方法具有一个特征规定的签名,该特征返回一个结构,而不是一个Future<Struct>. 据我所知,我无法将其标记为异步。

如果我尝试打电话df.collect().await;,我会得到

只允许在async函数和块内部

来自编译器的错误,指出我在其中调用的方法await不是async.

如果我尝试block_on从这样的新运行时获得未来:

tokio::runtime::Builder::new_current_thread()
    .build()
    .unwrap()
    .block_on(df.collect());

我得到一个运行时恐慌:

无法从运行时内启动运行时。发生这种情况是因为一个函数(如block_on)在线程被用于驱动异步任务时试图阻塞当前线程。

如果我尝试futures::executor::block_on(df.collect()).unwrap();,我会得到一个新的运行时恐慌:

“当前未在 Tokio 0.2.x 运行时上运行。”

这很奇怪,因为我使用的是 Tokio v1.1。

这感觉比它应该的更难。我在异步上下文中,感觉编译器应该知道这一点并允许我.await从方法内调用——唯一的代码路径从块内调用此方法async。有没有一种我想念的简单方法来做到这一点?

4

1 回答 1

7

我在异步上下文中,感觉编译器应该知道这一点并允许我从方法中调用 .await

await无论您是否在运行时的上下文中,基本上不可能进入同步函数内部。await转换为屈服点,async函数转换为状态机,利用这些屈服点执行异步计算。如果不将您的函数标记为async,则这种转换是不可能的。

如果我正确理解您的问题,您将拥有以下代码:

#[tokio::main]
async fn main() {
    let foo = Foo {};
    foo.bar()
}

impl Trait for Foo { 
    fn bar(df: DataFrame) -> Vec<Data> {
        df.collect().await
    }
}

问题是您不能df.collect从内部等待bar,因为它没有标记为async。如果您可以修改 的签名Trait,那么您可以使用如何在特征中定义异步方法中Trait::bar提到的解决方法创建一个异步方法?.

如果您无法更改 的签名Trait,那么您就有问题了。异步函数不应该花费很长时间而不达到.await. 正如在 future-rs 中封装阻塞 I/O 的最佳方法是什么?,您可以spawn_blocking在转换为非异步代码时使用:

#[tokio::main]
async fn main() {
    let foo = Foo {};
    tokio::task::spawn_blocking(move || foo.bar()).await
}

impl Trait for Foo { 
    fn bar(df: DataFrame) -> Vec<Data> {
        df.collect().await
    }
}

现在您需要一种df.collect无需等待即可运行完成的方法。您提到您尝试创建嵌套运行时来解决此问题:

如果我尝试从新的运行时阻止未来......我会感到恐慌

但是,tokio 不允许您创建嵌套运行时。您可以创建一个新的独立运行时,如如何在另一个 Tokio 运行时中创建 Tokio 运行时中所述。但是,生成嵌套运行时效率会很低。

您可以获取当前运行时的句柄,而不是生成新的运行时:

let handle = Handle::current();

输入运行时上下文:

handle.enter();

然后运行 ​​future 以完成futures::executor::block_on

impl Trait for Foo { 
    fn bar(df: DataFrame) -> Vec<Data> {
        let handle = Handle::current();
        handle.enter();
        futures::executor::block_on(df.collect())
    }
}

进入 tokio 运行时上下文将解决您之前遇到的错误:

如果我尝试 futures::executor::block_on(df.collect()).unwrap();,我会得到一个新的运行时恐慌not currently running on a Tokio 0.2.x runtime

如果可以的话,我会敦促你尽量避免这样做。最佳解决方案是标记Trait::bar为正常asyncawait正常。任何其他解决方案,包括上面提到的那些,都涉及阻塞当前线程,直到给定的未来完成。

信用@AliceRyhl 的解释

于 2021-02-19T16:05:41.780 回答