2

我希望用来multiprocessing加速一个缓慢的循环。但是,从我所看到的多处理示例来看,我不确定这种实现是否是好的实践、可行或可能。

循环大致有两个部分:data ingestiondata processing. 我希望在处理进行时开始数据摄取的下一部分,以便数据尽快可用。

伪代码:

d = get_data(n)
for n in range(N):
    p = process_data(d)
    d = get_data(n+1) #prepare data for next process loop
  1. 多处理是否适合这种功能?
  2. 如何做到这一点?

提前致谢。

4

2 回答 2

3

正如您所说,多处理基本上是调度和收集工作。正如您所澄清的,您基本上希望process_dataget_data并行工作。

这是我给你的解决方案

import multiprocessing as mp

# create pool for dispatching work
pool = mp.Pool()

# call your functions asynchronously
process_data_process = pool.apply_async(process_data, (d,))
get_data_process = pool.apply_async(get_data, (n+1,))

# After your functions are dispatched, wait for results
process_data_result = process_data_process.get()
get_data_result = get_data_process.get()

# Note: get_data_result will not be fetched till process_data_result is ready
# But that should be fine since you can't start the next batch
# till this batch is done

你可以把它包装在你的循环中。希望这能回答你的问题!

于 2018-02-14T03:19:21.180 回答
1

假设您希望有一个线程/进程来摄取数据,因为它将受 I/O 限制而不是 CPU 限制。在将数据传递到处理层之前,您只需对数据进行最少的解析和/或验证。

让我们进一步假设您可以完全并行地对每个输入项进行数据处理;这些输入项之间没有排序或时间/排序依赖性。

在这种情况下,您的任务基本上是“扇出”处理模型的典型代表。您创建一个multiprocessing.Queue对象。然后创建一个multiprocessing.Pool。这个初始化代码然后成为摄取处理任务(队列的“生产者”),进程池都成为消费者,执行处理。

网上有很多这样的例子,第一个链接可能有几个使用这种模式。

当然,剩下的问题是你将如何处理结果。

如果他们需要序列化回某个文件,那么显而易见的方法是创建两个 Queue 对象......一个用于工作队列(摄取进程提供它,池进程从中消耗),另一个是输出队列(池馈入其中,然后一个进程从中消耗以将结果连贯地写入您的输出)。请注意,让您的主(摄取)进程多路复用是可能的,而且有时非常有效。它可以将输入数据读取与输出队列上的轮询交错以写出结果。但是,当然,您也可以启动另一个专门用于输出处理的进程。

另一方面,您的结果可能可以并行写入,可能由工作进程写入。如果您将结果写入许多文件,或者将它们作为 INSERT 或 UPDATE 语句发布到某些 SQL 数据库,或者将它们提供给 Hadoop HDFS 或 Spark DataSet,这很好。有许多形式的输出适合并行写入。

您也可能希望将处理层和输出/结果处理层解耦。您的应用程序可能会通过数据处理层中的更多进程和输出层中的更少进程进行优化调整。(例如,如果每个项目的处理都是 CPU 密集型的,并且您有很多内核,那么当 CPU 处于空闲状态时,您可能会遇到太多进程阻塞 I/O 通道的问题)。

再次,使用队列。它们旨在支持多生产者和多消费者的一致性。您从担心并发锁定、死锁和活锁问题等的雷区中解脱出来。

于 2018-02-14T03:38:42.710 回答