1

我正在从数据库中迭代数 GB 的输入项。在每个输入项上,我都在进行一些 CPU 密集型处理,这会产生一个或多个新的输出项,总共有几十 GB。然后将输出项存储在另一个数据库表中。

通过使用 Rayon 进行并行处理,我得到了很好的加速。但是,数据库 API 不是线程安全的;it's Sendbut not Sync,所以 I/O 必须被序列化。

理想情况下,我只想写:

input_database
    .read_items()
    .par_bridge() // Start parallelism.
    .flat_map_iter(|input_item| {
        // produce an Iterator<Item = OutputItem>
    })
    .ser_bridge() // End parallelism. This function does not exist.
    .for_each(|output_item| {
        output_database.write_item(output_item);
    });

基本上我想要相反的par_bridge(); 在调用它的线程上运行的东西,从每个线程中读取项目,并按顺序生成它们。但在当前的 Rayon 实现中,这似乎不存在。我不确定这是否是因为它在理论上是不可能的,或者它是否不适合图书馆的当前设计。

输出太大,无法将其全部收集到Vec第一个;它需要直接流式传输到数据库中。

顺便说一句,我没有嫁给Rayon;如果有另一个更合适的板条箱,我很乐意进行转换。

4

2 回答 2

3

您可以将输出数据库包装在 anArc<Mutex>中以防止并行访问:

let output_database = Arc::new (Mutex::new (output_database));
input_database
    .read_items()
    .par_bridge() // Start parallelism.
    .flat_map_iter(|input_item| {
        // produce an Iterator<Item = OutputItem>
    })
    .for_each_with (output_database, |output_database, output_item| {
        output_database.lock().write_item(output_item);
    });
于 2021-03-10T13:32:26.530 回答
1

我认为顺序无关紧要,因此您不需要输出数据的顺序。

您可以使用 ampsc::channel将数据从for_each闭包传输到数据库 api,例如

use std::sync::mpsc;

let (tx, rx) = mpsc::channel();

input_database
    .read_items()
    .par_bridge() // Start parallelism.
    .flat_map_iter(|input_item| {
        // produce an Iterator<Item = OutputItem>
    })
    .for_each(move |output_item| {
        tx.send(output_item).unwrap();
    });

在第二个线程中,您可以使用rx变量接收数据并将其写入数据库。

于 2021-03-10T08:58:53.350 回答