2

我正在尝试使用以下方式发送并行异步 Rusoto SQS 请求FuturesOrdered

use futures::prelude::*; // 0.1.26
use futures::stream::futures_unordered::FuturesUnordered;
use rusoto_core::{Region, HttpClient}; // 0.38.0
use rusoto_credential::EnvironmentProvider; // 0.17.0
use rusoto_sqs::{SendMessageBatchRequest, SendMessageBatchRequestEntry, Sqs, SqsClient}; // 0.38.0

fn main() {
    let client = SqsClient::new_with(
        HttpClient::new().unwrap(),
        EnvironmentProvider::default(),
        Region::UsWest2,
    );

    let messages: Vec<u32> = (1..12).map(|n| n).collect();
    let chunks: Vec<_> = messages.chunks(10).collect();

    let tasks: FuturesUnordered<_> = chunks.into_iter().map(|c| {
        let batch = create_batch(c);
        client.send_message_batch(batch)
    }).collect();

    let tasks = tasks
        .for_each(|t| {
            println!("{:?}", t);
            Ok(())
        })
        .map_err(|e| println!("{}", e));

    tokio::run(tasks);
}

fn create_batch(ids: &[u32]) -> SendMessageBatchRequest {
    let queue_url = "https://sqs.us-west-2.amazonaws.com/xxx/xxx".to_string();
    let entries = ids
        .iter()
        .map(|id| SendMessageBatchRequestEntry {
            id: id.to_string(),
            message_body: id.to_string(),
            ..Default::default()
        })
        .collect();

    SendMessageBatchRequest {
        entries,
        queue_url,
    }
}

任务正确完成但tokio::run(tasks)不会停止。我认为这是因为tasks.for_each()会迫使它继续运行并寻找更多的未来?

为什么tokio::run(tasks)停不下来?我使用FuturesOrdered正确吗?

FuturesUnordered在创建多达 60,000 个期货以完成并将它们推入组合器时,我也有点担心内存使用情况。

4

1 回答 1

0

我发现这SqsClient是导致它阻塞的主要功能,因为即使任务完成,它仍在做一些家务。

一个 Rusoto 人提供的解决方案是在上面添加这个tokio::run

std::mem::drop(client);
于 2019-05-12T07:12:30.220 回答