我遇到了很多我想与multiprocessing
模块并行的“令人尴尬的并行”项目。但是,它们通常涉及读取大文件(大于 2gb),逐行处理它们,运行基本计算,然后写入结果。使用 Python 的多处理模块拆分文件并处理它的最佳方法是什么?应该使用Queue
还是JoinableQueue
使用multiprocessing
?还是Queue
模块本身?或者,我应该使用 将可迭代的文件映射到进程池multiprocessing
吗?我已经尝试过这些方法,但是逐行分发数据的开销是巨大的。我已经通过使用确定了一个轻量级管道过滤器设计cat file | process1 --out-file out1 --num-processes 2 | process2 --out-file out2
,它通过了第一个过程的一定百分比'),但我希望有一个完全包含在 Python 中的解决方案。
令人惊讶的是,Python 文档并没有建议这样做的规范方法(尽管multiprocessing
文档中关于编程指南的部分很长)。
谢谢,文斯
附加信息:每行的处理时间各不相同。有些问题很快,几乎不受 I/O 限制,有些受 CPU 限制。受 CPU 限制的非依赖任务将从并行化中获得优势,因此即使将数据分配给处理功能的低效方式在挂钟时间方面仍然是有益的。
一个典型的例子是一个脚本,它从行中提取字段,检查各种按位标志,并将带有某些标志的行以全新格式写入新文件。这似乎是一个 I/O 绑定问题,但是当我使用带有管道的廉价并发版本运行它时,它快了大约 20%。当我使用池和地图运行它时,或者在其中排队时,multiprocessing
它总是慢 100% 以上。