19

我遇到了很多我想与multiprocessing模块并行的“令人尴尬的并行”项目。但是,它们通常涉及读取大文件(大于 2gb),逐行处理它们,运行基本计算,然后写入结果。使用 Python 的多处理模块拆分文件并处理它的最佳方法是什么?应该使用Queue还是JoinableQueue使用multiprocessing?还是Queue模块本身?或者,我应该使用 将可迭代的文件映射到进程池multiprocessing吗?我已经尝试过这些方法,但是逐行分发数据的开销是巨大的。我已经通过使用确定了一个轻量级管道过滤器设计cat file | process1 --out-file out1 --num-processes 2 | process2 --out-file out2,它通过了第一个过程的一定百分比'),但我希望有一个完全包含在 Python 中的解决方案。

令人惊讶的是,Python 文档并没有建议这样做的规范方法(尽管multiprocessing文档中关于编程指南的部分很长)。

谢谢,文斯

附加信息:每行的处理时间各不相同。有些问题很快,几乎不受 I/O 限制,有些受 CPU 限制。受 CPU 限制的非依赖任务将从并行化中获得优势,因此即使将数据分配给处理功能的低效方式在挂钟时间方面仍然是有益的。

一个典型的例子是一个脚本,它从行中提取字段,检查各种按位标志,并将带有某些标志的行以全新格式写入新文件。这似乎是一个 I/O 绑定问题,但是当我使用带有管道的廉价并发版本运行它时,它快了大约 20%。当我使用池和地图运行它时,或者在其中排队时,multiprocessing它总是慢 100% 以上。

4

7 回答 7

9

最好的架构之一已经是 Linux 操作系统的一部分。不需要特殊的库。

你想要一个“扇出”设计。

  1. “主”程序创建许多通过管道连接的子进程。

  2. 主程序读取文件,将行写入管道,执行将行处理到适当的子进程所需的最小过滤。

每个子进程可能应该是从标准输入读取和写入的不同进程的管道。

您不需要队列数据结构,这正是内存中的管道——两个并发进程之间的字节队列。

于 2009-12-01T01:45:05.833 回答
6

一种策略是为每个工作人员分配一个偏移量,因此如果您有八个工作进程,则分配编号 0 到 7。工作编号 0 读取第一个记录处理,然后跳过 7 并继续处理第 8 条记录等,工作编号 1读取第二条记录然后跳过 7 并处理第 9 条记录.........

这种方案有很多优点。不管文件有多大,工作总是被平均分配,同一台机器上的进程将以大致相同的速率处理,并使用相同的缓冲区,因此您不会产生任何过多的 I/O 开销。只要文件尚未更新,您就可以重新运行各个线程以从故障中恢复。

于 2009-12-01T01:18:39.100 回答
4

你没有提到你是如何处理这些行的;可能是最重要的信息。

每条线是独立的吗?计算是否取决于一行在下一行之前?必须分块处理吗?每条线的处理需要多长时间?是否有一个处理步骤必须在最后包含“所有”数据?或者是否可以丢弃中间结果而只保留一个运行总数?可以通过将文件大小除以线程数来初始拆分文件吗?或者它会随着你的处理而增长?

如果行是独立的并且文件没有增长,那么您唯一需要的协调就是将“起始地址”和“长度”分配给每个工作人员;他们可以独立打开并查找文件,然后您必须简单地协调他们的结果;也许通过等待 N 个结果返回到队列中。

如果这些行不是独立的,答案将很大程度上取决于文件的结构。

于 2009-12-01T00:28:40.277 回答
1

这在很大程度上取决于文件的格式。

将它拆分到任何地方有意义吗?还是您需要将其拆分为新行?或者您是否需要确保在对象定义的末尾拆分它?

您应该在同一个文件上使用多个阅读器,而不是拆分文件,os.lseek用于跳转到文件的适当部分。

更新:海报补充说他想在新的线路上拆分。然后我提出以下建议:

假设您有 4 个进程。然后简单的解决方案是 os.lseek 到文件的 0%、25%、50% 和 75%,并读取字节,直到你打到第一个新行。这是每个过程的起点。您不需要拆分文件来执行此操作,只需在每个进程中寻找大文件中的正确位置并从那里开始读取。

于 2009-12-01T00:28:31.010 回答
1

我知道您专门询问了 Python,但我会鼓励您查看 Hadoop ( http://hadoop.apache.org/ ):它实现了专门为解决此类问题而设计的 Map 和 Reduce 算法。

祝你好运

于 2009-12-01T00:31:40.943 回答
1

Fredrik Lundh 的关于 Tim Bray 的 Wide Finder 基准的一些注释是一本有趣的读物,关于一个非常相似的用例,并提供了很多好的建议。其他许多作者也实现了相同的东西,其中一些链接自文章,但您可能想尝试使用谷歌搜索“python Wide finder”或其他内容以找到更多内容。(还有一个基于multiprocessing模块的解决方案,但似乎不再可用)

于 2009-12-01T11:03:09.787 回答
0

如果运行时间很长,则不要让每个进程通过 a 读取其下一行,而是让Queue进程读取成批的行。这样,开销分摊到几行(例如数千行或更多行)。

于 2009-12-01T01:11:36.697 回答