40

异步示例很有用,但作为 Rust 和 Tokio 的新手,我正在努力研究如何一次执行 N 个请求,使用来自向量的 URL,并为每个 URL 创建一个响应 HTML 的迭代器作为字符串。

怎么可能做到这一点?

4

2 回答 2

95

并发请求

从 reqwest 0.10 开始:

use futures::{stream, StreamExt}; // 0.3.5
use reqwest::Client; // 0.10.6
use tokio; // 0.2.21, features = ["macros"]

const CONCURRENT_REQUESTS: usize = 2;

#[tokio::main]
async fn main() {
    let client = Client::new();

    let urls = vec!["https://api.ipify.org"; 2];

    let bodies = stream::iter(urls)
        .map(|url| {
            let client = &client;
            async move {
                let resp = client.get(url).send().await?;
                resp.bytes().await
            }
        })
        .buffer_unordered(CONCURRENT_REQUESTS);

    bodies
        .for_each(|b| async {
            match b {
                Ok(b) => println!("Got {} bytes", b.len()),
                Err(e) => eprintln!("Got an error: {}", e),
            }
        })
        .await;
}

stream::iter(urls)

stream::iter

获取一组字符串并将其转换为Stream.

.map(|url| {

StreamExt::map

对流中的每个元素运行异步函数并将元素转换为新类型。

let client = &client;
async move {

对 进行显式引用Client并将引用(不是原始的Client)移动到匿名异步块中。

let resp = client.get(url).send().await?;

使用 的连接池启动异步 GET 请求Client并等待请求。

resp.bytes().await

请求并等待响应的字节。

.buffer_unordered(N);

StreamExt::buffer_unordered

将期货流转换为这些期货的价值流,同时执行期货。

bodies
    .for_each(|b| {
        async {
            match b {
                Ok(b) => println!("Got {} bytes", b.len()),
                Err(e) => eprintln!("Got an error: {}", e),
            }
        }
    })
    .await;

StreamExt::for_each

将流转换回单个未来,打印出沿途接收的数据量,然后等待未来完成。

也可以看看:

无限制执行

如果您愿意,您还可以将迭代器转换为期货迭代器并使用future::join_all

use futures::future; // 0.3.4
use reqwest::Client; // 0.10.1
use tokio; // 0.2.11

#[tokio::main]
async fn main() {
    let client = Client::new();

    let urls = vec!["https://api.ipify.org"; 2];

    let bodies = future::join_all(urls.into_iter().map(|url| {
        let client = &client;
        async move {
            let resp = client.get(url).send().await?;
            resp.bytes().await
        }
    }))
    .await;

    for b in bodies {
        match b {
            Ok(b) => println!("Got {} bytes", b.len()),
            Err(e) => eprintln!("Got an error: {}", e),
        }
    }
}

我鼓励使用第一个示例,因为您通常希望限制并发性,这bufferbuffer_unordered帮助。

并行请求

并发请求通常足够好,但有时您需要并行请求。在这种情况下,您需要生成一个任务。

use futures::{stream, StreamExt}; // 0.3.8
use reqwest::Client; // 0.10.9
use tokio; // 0.2.24, features = ["macros"]

const PARALLEL_REQUESTS: usize = 2;

#[tokio::main]
async fn main() {
    let urls = vec!["https://api.ipify.org"; 2];

    let client = Client::new();

    let bodies = stream::iter(urls)
        .map(|url| {
            let client = client.clone();
            tokio::spawn(async move {
                let resp = client.get(url).send().await?;
                resp.bytes().await
            })
        })
        .buffer_unordered(PARALLEL_REQUESTS);

    bodies
        .for_each(|b| async {
            match b {
                Ok(Ok(b)) => println!("Got {} bytes", b.len()),
                Ok(Err(e)) => eprintln!("Got a reqwest::Error: {}", e),
                Err(e) => eprintln!("Got a tokio::JoinError: {}", e),
            }
        })
        .await;
}

主要区别是:

  • 我们tokio::spawn习惯于在不同的任务中执行工作。
  • 我们必须赋予每项任务自己的任务reqwest::Client。按照建议,我们克隆一个共享客户端以使用连接池。
  • 当任务无法加入时,还有一个额外的错误情况。

也可以看看:

于 2018-06-26T16:41:24.870 回答
-1

如果可能解决您的问题,我建议使用 std async 和 rayon。它们现在都很成熟,而且很容易上手,因为这里有 async{/* 代码 */} 范围界限。您还可以通过功能集成进入/与 tokio 一起工作https://docs.rs/async-std/1.10.0/async_std/#features

于 2021-10-25T23:02:14.790 回答