4

我读过的最新帖子都说异步是执行大量 I/O 绑定工作(例如发送 HTTP 请求等)的更好方法。我最近尝试使用异步,但在理解如何并行发送多组请求方面遇到了困难,例如:

let client = reqwest::Client::new();
let mut requests = 0;

let get = client.get("https://somesite.com").send().await?;
let response = get.text().await?;

if response.contains("some stuff") {
    let get = client.get("https://somesite.com/something").send().await?;
    let response = get.text().await?;

    if response.contains("some new stuff") {
        requests += 1;
        println!("Got response {}", requests)

这就是我想要的,但是我怎样才能并行运行它并控制“工作线程”的数量或异步线程池的等价物?

我知道它与这个问题相似,但我的意思是严格谈论夜间 Rust 异步/等待语法和需要完成请求/任务组的更具体的用例。我还发现在这些情况下使用组合器有点令人困惑,希望更新的样式有助于使其更具可读性。

4

1 回答 1

0

不确定这是否是最快的方法,因为我只是在尝试自己,但这是我的解决方案:

let client = reqwest::Client::new();

let links = vec![ // A vec of strings representing links
    "example.net/a".to_owned(), 
    "example.net/b".to_owned(),
    "example.net/c".to_owned(),
    "example.net/d".to_owned(),
    ];

let ref_client = &client; // Need this to prevent client from being moved into the first map
futures::stream::iter(links)
    .map(async move |link: String| {
        let res = ref_client.get(&link).send().await;

        // res.map(|res| res.text().await.unwrap().to_vec())
        match res { // This is where I would usually use `map`, but not sure how to await for a future inside a result 
            Ok(res) => Ok(res.text().await.unwrap()),
            Err(err) => Err(err), 
        }
    })
    .buffer_unordered(10) // Number of connection at the same time
    .filter_map(|c| future::ready(c.ok())) // Throw errors out, do your own error handling here
    .filter_map(|item| {
        if item.contains("abc") {
            future::ready(Some(item))
        } else {
            future::ready(None)
        }
    })
    .map(async move |sec_link| {
        let res = ref_client.get(&sec_link).send().await;
        match res {
            Ok(res) => Ok(res.text().await.unwrap()),
            Err(err) => Err(err),
        }
    })
    .buffer_unordered(10) // Number of connections for the secondary requests (so max 20 connections concurrently)
    .filter_map(|c| future::ready(c.ok()))
    .for_each(|item| {
        println!("File received: {}", item);
        future::ready(())
    })
    .await;

这需要该#![feature(async_closure)]功能。

于 2019-10-24T03:23:19.627 回答