我试图找到大约 61 亿个(自定义)项目的最大重量,我想通过并行处理来做到这一点。对于我的特定应用程序,有更好的算法不需要我迭代超过 61 亿个项目,但是解释它们的教科书让我头疼,我的老板希望在 4 天内完成。我想我有更好的机会使用我公司的高级服务器和并行处理。但是,我对并行处理的了解都来自阅读Python 文档。也就是说我很迷茫...
我目前的理论是设置一个馈线进程、一个输入队列、一大堆(比如 30 个)工作进程和一个输出队列(在输出队列中找到最大元素将是微不足道的)。我不明白的是馈线进程如何告诉工作进程何时停止等待项目通过输入队列。
我曾考虑过multiprocessing.Pool.map_async
在可迭代的 6.1E9 项目上使用,但仅在不对它们做任何事情的情况下迭代这些项目就需要将近 10 分钟。除非我误解了某些东西...,map_async
否则可以在流程开始工作时对它们进行迭代以将它们分配给流程。(Pool
也提供了imap
,但文档说它类似于map
,它似乎不能异步工作。我想要异步,对吧?)
相关问题:我想用concurrent.futures
代替multiprocessing
吗?我不可能是第一个实现双队列系统的人(这正是美国每个熟食店的排队方式……)所以有没有更 Pythonic/内置的方法来做到这一点?
这是我正在尝试做的事情的骨架。请参阅中间的注释块。
import multiprocessing as mp
import queue
def faucet(items, bathtub):
"""Fill bathtub, a process-safe queue, with 6.1e9 items"""
for item in items:
bathtub.put(item)
bathtub.close()
def drain_filter(bathtub, drain):
"""Put maximal item from bathtub into drain.
Bathtub and drain are process-safe queues.
"""
max_weight = 0
max_item = None
while True:
try:
current_item = bathtub.get()
# The following line three lines are the ones that I can't
# quite figure out how to trigger without a race condition.
# What I would love is to trigger them AFTER faucet calls
# bathtub.close and the bathtub queue is empty.
except queue.Empty:
drain.put((max_weight, max_item))
return
else:
bathtub.task_done()
if not item.is_relevant():
continue
current_weight = item.weight
if current_weight > max_weight:
max_weight = current_weight
max_item = current_item
def parallel_max(items, nprocs=30):
"""The elements of items should have a method `is_relevant`
and an attribute `weight`. `items` itself is an immutable
iterator object.
"""
bathtub_q = mp.JoinableQueue()
drain_q = mp.Queue()
faucet_proc = mp.Process(target=faucet, args=(items, bathtub_q))
worker_procs = mp.Pool(processes=nprocs)
faucet_proc.start()
worker_procs.apply_async(drain_filter, bathtub_q, drain_q)
finalists = []
for i in range(nprocs):
finalists.append(drain_q.get())
return max(finalists)
这是答案
我从 Python 基金会通讯主管 Doug Hellman 那里找到了对我的问题的一个非常彻底的答案,以及对多任务处理的温和介绍。我想要的是“毒丸”模式。在这里查看:http: //www.doughellmann.com/PyMOTW/multiprocessing/communication.html
支持@MRAB 发布该概念的核心。