0

我有一个请求服务器、检索一些数据、处理它并保存一个 csv 文件的函数。此功能应启动 20k 次。每次执行的持续时间都不同:有时会持续超过 20 分钟,有时会持续不到一秒。我决定继续multiprocessing.Pool.map并行执行。我的代码如下所示:

def get_data_and_process_it(filename):
    print('getting', filename)
    ...
    print(filename, 'has been process')

with Pool(8) as p:
    p.map(get_data_and_process_it, long_list_of_filenames)

看看是如何prints生成的,它似乎long_list_of_filenames被分成 8 个部分并分配给每个部分,CPU因为有时只是在 20 分钟的执行中被阻塞,而在这 20 分钟内没有long_list_of_filenames处理任何其他元素。我所期望的是以mapFIFO 样式安排 cpu 核心中的每个元素。

我的情况有更好的方法吗?

4

2 回答 2

1

map方法仅在所有操作完成后返回。

从泳池工人那里打印并不理想。一方面,像stdout使用缓冲这样的文件,所以在打印一条消息和它实际出现之间可能会有不同的时间。此外,由于所有工人都继承了相同的stdout,他们的输出将变得相互交织,甚至可能出现乱码。

所以我建议imap_unordered改用。这将返回一个迭代器,该迭代器将在它们可用时立即开始产生结果。唯一的问题是这会按照完成的顺序返回结果,而不是按照开始的顺序。

您的工作函数 ( get_data_and_process_it) 应该返回某种状态指示器。例如文件名和结果的元组。

def get_data_and_process_it(filename):
    ...
    if (error):
        return (filename, f'has *failed* bacause of {reason}')
    return (filename, 'has been processed')

然后你可以这样做:

with Pool(8) as p:
   for fn, res in p.imap_unordered(get_data_and_process_it, long_list_of_filenames):
       print(fn, res)

这提供了有关作业何时完成的准确信息,并且由于只有父进程写入stdout,因此输出没有变化成为乱码。

此外,我建议sys.stdout.reconfigure(line_buffering=True)在程序开始的某个地方使用。这确保了stdout在每行输出之后都会刷新流。

于 2019-08-02T09:26:14.543 回答
1

map正在阻塞,而不是p.map您可以使用p.map_async. map将等待所有这些函数调用完成,以便我们连续看到所有结果。map_async以随机顺序完成工作,并且在开始新任务之前不等待正在进行的任务完成。这是最快的方法。(更多)还有一个SO 线程详细讨论了mapmap_async

多处理池类为我们处理排队逻辑。它非常适合并行运行网络抓取作业(示例)或任何可以独立分解和分发的作业。如果您需要对队列进行更多控制或需要在多个进程之间共享数据,您可能需要查看Queue该类(了解更多)。

于 2019-08-02T08:21:22.330 回答