0

我正在构建一个小应用程序,它应该以rusoto不同的时间间隔安排两个任务(基于 AWS SDK):每 X 秒运行一个任务,每 Y 秒运行另一个。

我找到crossbeam了提供滴答计时器和select!宏的板条箱,并将它们放在一起,如下所示:

fn main() -> Result<(), Error> {
  let cloudwatch_client = rusoto_cloudwatch::CloudWatchClient::new();
  let rt = Runtime::new().unwrap();
  let tick_a = tick(Duration::from_secs(60));
  let tick_b = tick(Duration::from_secs(30));

  loop {
    select! {
      recv(tick_a) -> _ => {
        rt.block_on(cloudwatch_client.put_metric_data( /* ... */ ));
      },
      /* similar for tick_b */
    }
  }
}

这会编译,但是程序会出现thread 'main' panicked at 'not currently running on the Tokio runtime.'. 通过分析回溯,这似乎来自 Rusoto 调用。

我在这里想念什么?有没有办法使这项工作?有没有更好的方法来处理 Rust 中的任务调度?

请注意,这个问题似乎没有解决我的问题。这个问题从使用futures::executor::block_on函数开始,然后通过使用block_ontokio 实现的方法来解决Runtime。我已经在使用中的block_on方法了Runtime

4

3 回答 3

1

线程“主”在“当前未在 Tokio 运行时运行”时发生恐慌。

如果特定库所需的 tokio 运行时版本未在运行,则会显示此错误 - 因为每个主要版本使用不同的线程局部变量,并且同一个构建中可以包含超过 1 个主要版本的库.

在您的情况下,您可能正在运行 Tokio 0.3 运行时,但 rusoto 需要 Tokio 0.2 运行时。当 rusoto 然后尝试通过 Tokio 0.2(也包含在构建中)执行 IO 时,检测到没有运行时处于活动状态并产生错误。

要解决此问题,请确保在您的项目中仅使用一个 tokio 版本。您可能需要将 tokio 降级为0.2via Cargo.toml,因为可能没有更新的 rusoto 版本可用。

还有一个不相关的建议:

除了将 crossbeam 用于计时器,您还可以在 tokio 运行时中运行“整个事情”:您可以使用 tokio::select!和 tokio timers 来做你在这里用横梁做的事情。

请参阅https://docs.rs/tokio/0.2.24/tokio/time/fn.interval.htmlhttps://docs.rs/tokio/0.2.24/tokio/macro.select.html(其中有示例类似于您的用例)

于 2020-12-29T19:33:26.310 回答
0

CloudWatchClient::put_metric_data返回一个RusotoFuture有一个sync方法做你想做的事。所以而不是:

let cloudwatch_client = rusoto_cloudwatch::CloudWatchClient::new();
let rt = Runtime::new().unwrap();
rt.block_on(cloudwatch_client.put_metric_data(/* ... */));

你可以做:

let cloudwatch_client = rusoto_cloudwatch::CloudWatchClient::new();
cloudwatch_client.put_metric_data(/* ... */).sync();

这也是官方文档推荐的

于 2020-12-25T14:38:25.090 回答
0

请注意:

请注意,从 v0.43.0 开始,Rusoto 使用 Rust 的 std::future::Future 和 Tokio 0.2 生态系统。从 v0.46.0 开始,Rusoto 使用 Tokio 1.0 生态系统。

对于简单的 S3 调用,我建议使用以下依赖项:

[dependencies]  
rusoto_core = "0.46.0"                            
rusoto_s3 = "0.46.0"                                                    
tokio = {version = "1.0",  features = ["full"]}  

调用可以这样实现:

use rusoto_core::Region;
use rusoto_s3::{S3, S3Client};

#[tokio::main]
async fn main() {
    let region = Region::UsEast1;
    let client = S3Client::new(region);

    match client.list_buckets().await {
        Ok(output) => match output.buckets {
            Some(bucket_list) => {
                println!("Buckets in S3:");

                for bucket in bucket_list {
                    println!("{:?}", bucket.name);
                }
            }
            None => println!("No buckets!"),
        },
        Err(error) => {
            println!("Error: {:?}", error);
        }
    }
}

设置凭据以避免凭据错误。

于 2021-01-12T18:55:03.237 回答