假设您希望有一个线程/进程来摄取数据,因为它将受 I/O 限制而不是 CPU 限制。在将数据传递到处理层之前,您只需对数据进行最少的解析和/或验证。
让我们进一步假设您可以完全并行地对每个输入项进行数据处理;这些输入项之间没有排序或时间/排序依赖性。
在这种情况下,您的任务基本上是“扇出”处理模型的典型代表。您创建一个multiprocessing.Queue对象。然后创建一个multiprocessing.Pool。这个初始化代码然后成为摄取处理任务(队列的“生产者”),进程池都成为消费者,执行处理。
网上有很多这样的例子,第一个链接可能有几个使用这种模式。
当然,剩下的问题是你将如何处理结果。
如果他们需要序列化回某个文件,那么显而易见的方法是创建两个 Queue 对象......一个用于工作队列(摄取进程提供它,池进程从中消耗),另一个是输出队列(池馈入其中,然后一个进程从中消耗以将结果连贯地写入您的输出)。请注意,让您的主(摄取)进程多路复用是可能的,而且有时非常有效。它可以将输入数据读取与输出队列上的轮询交错以写出结果。但是,当然,您也可以启动另一个专门用于输出处理的进程。
另一方面,您的结果可能可以并行写入,可能由工作进程写入。如果您将结果写入许多文件,或者将它们作为 INSERT 或 UPDATE 语句发布到某些 SQL 数据库,或者将它们提供给 Hadoop HDFS 或 Spark DataSet,这很好。有许多形式的输出适合并行写入。
您也可能希望将处理层和输出/结果处理层解耦。您的应用程序可能会通过数据处理层中的更多进程和输出层中的更少进程进行优化调整。(例如,如果每个项目的处理都是 CPU 密集型的,并且您有很多内核,那么当 CPU 处于空闲状态时,您可能会遇到太多进程阻塞 I/O 通道的问题)。
再次,使用队列。它们旨在支持多生产者和多消费者的一致性。您从担心并发锁定、死锁和活锁问题等的雷区中解脱出来。