12

我试图找到大约 61 亿个(自定义)项目的最大重量,我想通过并行处理来做到这一点。对于我的特定应用程序,有更好的算法不需要我迭代超过 61 亿个项目,但是解释它们的教科书让我头疼,我的老板希望在 4 天内完成。我想我有更好的机会使用我公司的高级服务器和并行处理。但是,我对并行处理的了解都来自阅读Python 文档。也就是说我很迷茫...

我目前的理论是设置一个馈线进程、一个输入队列、一大堆(比如 30 个)工作进程和一个输出队列(在输出队列中找到最大元素将是微不足道的)。我不明白的是馈线进程如何告诉工作进程何时停止等待项目通过输入队列。

我曾考虑过multiprocessing.Pool.map_async在可迭代的 6.1E9 项目上使用,但仅在不对它们做任何事情的情况下迭代这些项目就需要将近 10 分钟。除非我误解了某些东西...map_async否则可以在流程开始工作时对它们进行迭代以将它们分配给流程。(Pool也提供了imap,但文档说它类似于map,它似乎不能异步工作。我想要异步,对吧?

相关问题:我想用concurrent.futures代替multiprocessing吗?我不可能是第一个实现双队列系统的人(这正是美国每个熟食店的排队方式……)所以有没有更 Pythonic/内置的方法来做到这一点?

这是我正在尝试做的事情的骨架。请参阅中间的注释块。

import multiprocessing as mp
import queue

def faucet(items, bathtub):
    """Fill bathtub, a process-safe queue, with 6.1e9 items"""
    for item in items:
        bathtub.put(item)
    bathtub.close()

def drain_filter(bathtub, drain):
    """Put maximal item from bathtub into drain.
    Bathtub and drain are process-safe queues.
    """
    max_weight = 0
    max_item = None
    while True:
        try:
            current_item = bathtub.get()
        # The following line three lines are the ones that I can't
        # quite figure out how to trigger without a race condition.
        # What I would love is to trigger them AFTER faucet calls
        # bathtub.close and the bathtub queue is empty.
        except queue.Empty:
            drain.put((max_weight, max_item))
            return
        else:
            bathtub.task_done()
        if not item.is_relevant():
            continue
        current_weight = item.weight
        if current_weight > max_weight:
            max_weight = current_weight
            max_item = current_item

def parallel_max(items, nprocs=30):
    """The elements of items should have a method `is_relevant`
    and an attribute `weight`. `items` itself is an immutable
    iterator object.
    """
    bathtub_q = mp.JoinableQueue()
    drain_q = mp.Queue()

    faucet_proc = mp.Process(target=faucet, args=(items, bathtub_q))
    worker_procs = mp.Pool(processes=nprocs)

    faucet_proc.start()
    worker_procs.apply_async(drain_filter, bathtub_q, drain_q)

    finalists = []
    for i in range(nprocs):
        finalists.append(drain_q.get())

    return max(finalists)


这是答案

我从 Python 基金会通讯主管 Doug Hellman 那里找到了对我的问题的一个非常彻底的答案,以及对多任务处理的温和介绍。我想要的是“毒丸”模式。在这里查看:http: //www.doughellmann.com/PyMOTW/multiprocessing/communication.html

支持@MRAB 发布该概念的核心。

4

1 回答 1

6

您可以将一个特殊的终止项(例如 None)放入队列中。当一个工人看到它时,它可以把它放回去让其他工人看到,然后终止。或者,您可以将每个工作人员的一个特殊终止项放入队列中。

于 2012-05-15T19:59:27.610 回答