我正在从数据库中迭代数 GB 的输入项。在每个输入项上,我都在进行一些 CPU 密集型处理,这会产生一个或多个新的输出项,总共有几十 GB。然后将输出项存储在另一个数据库表中。
通过使用 Rayon 进行并行处理,我得到了很好的加速。但是,数据库 API 不是线程安全的;it's Send
but not Sync
,所以 I/O 必须被序列化。
理想情况下,我只想写:
input_database
.read_items()
.par_bridge() // Start parallelism.
.flat_map_iter(|input_item| {
// produce an Iterator<Item = OutputItem>
})
.ser_bridge() // End parallelism. This function does not exist.
.for_each(|output_item| {
output_database.write_item(output_item);
});
基本上我想要相反的par_bridge()
; 在调用它的线程上运行的东西,从每个线程中读取项目,并按顺序生成它们。但在当前的 Rayon 实现中,这似乎不存在。我不确定这是否是因为它在理论上是不可能的,或者它是否不适合图书馆的当前设计。
输出太大,无法将其全部收集到Vec
第一个;它需要直接流式传输到数据库中。
顺便说一句,我没有嫁给Rayon;如果有另一个更合适的板条箱,我很乐意进行转换。