8

这是我最近的问题Avoiding race conditions in Python 3's multiprocessing Queues的扩展。希望这个版本的问题更具体。

TL;DR:在多处理模型中,工作进程使用 队列从队列中馈送multiprocessing.Queue,为什么我的工作进程如此空闲?每个进程都有自己的输入队列,因此它们不会相互争夺共享队列的锁,但队列实际上只是空了很多时间。主进程正在运行一个 I/O-bound 线程——这会减慢 CPU-bound 填充输入队列的速度吗?

我试图在一定的约束下找到 N 个集合的笛卡尔积的最大元素,每个集合都有 M_i 个元素(对于 0 <= i < N)。回想一下笛卡尔积的元素是长度为 N 的元组,其元素是 N 个集合的元素。我将这些元组称为“组合”以强调我正在循环遍历原始集合的每个组合这一事实。当我的函数is_feasible返回时,组合满足约束True。在我的问题中,我试图找到元素权重最大的组合:sum(element.weight for element in combination).

我的问题很大,但我公司的服务器也很大。我正在尝试将以下串行算法重写为并行算法。

from operator import itemgetter
from itertools import product # Cartesian product function from the std lib
def optimize(sets):
    """Return the largest (total-weight, combination) tuple from all
    possible combinations of the elements in the several sets, subject
    to the constraint that is_feasible(combo) returns True."""
    return max(
                map(
                    lambda combination: (
                        sum(element.weight for element in combination),
                        combination
                    ),
                    filter(
                        is_feasible, # Returns True if combo meets constraint
                        product(*sets)
                    )
                ),
                key=itemgetter(0) # Only maximize based on sum of weight
            )

我目前的多处理方法是创建工作进程并将它们与输入队列组合。当工人收到毒丸时,他们将他们见过的最佳组合放在输出队列中并退出。我从主进程的主线程填充输入队列。这种技术的一个优点是我可以从主进程产生一个辅助线程来运行一个监控工具(只是一个 REPL,我可以使用它来查看到目前为止已经处理了多少组合以及队列有多满)。

                    +-----------+
            in_q0   |   worker0 |----\
            /-------+-----------+     \
+-----------+   in_q1   +-----------+  \ out_q  +-----------+
|   main    |-----------|   worker1 |-----------|   main    |
+-----------+           +-----------+  /        +-----------+
            \-------+-----------+     /
            in_q2   |   worker2 |----/
                    +-----------+

我最初让所有工作人员从一个输入队列中读取,但发现他们都没有访问 CPU。考虑到他们一直在等待 queue.get() 解除阻塞,我给了他们自己的队列。这增加了 CPU 的压力,所以我认为工作人员更频繁地活动。但是,队列大部分时间都是空的!(我从我提到的监控 REPL 中知道这一点)。这向我表明填满队列的主循环很慢。这是那个循环:

from itertools import cycle
main():
    # (Create workers, each with its own input queue)
    # Cycle through each worker's queue and add a combination to that queue
    for combo, worker in zip(product(*sets), cycle(workers)):
        worker.in_q.put(combo)
    # (Collect results and return)

我猜瓶颈是worker.in_q.put(). 我怎样才能让它更快?我的第一直觉是让工作人员变慢,但这没有任何意义......监视器线程过于频繁地停止循环的问题是什么?我怎么能说出来?

或者,是否有另一种不涉及太多等待锁的方法来实现这一点?

4

1 回答 1

4

你的元素是什么样的?可能是酸洗它们以将它们放入队列中很慢,这显然是一个瓶颈。请注意,每个元素都被一遍又一遍地独立腌制。

如果是这种情况,这种方法可能会有所帮助:

  • 选择一个基数 >= 你的工人数量的集合。理想情况下,这将远远超过工人的数量。将此集合称为 A,并将 A 的大致相等的子集分配给每个工人。将该子集传输给每个工作人员。
  • 将除 A 之外的所有集合的全部内容分发给每个工作人员(可能通过pickle.dumps一次,然后将相同的字符串传输给每个工作人员,或者可能通过共享内存或其他方式)。
  • 然后每个工人都拥有完成其子集所需的全部信息。它可以愉快地开始product(my_A_subset, *other_sets)(可能顺序不同),在每个作业(或每三个作业或其他)之间轮询某种停止信号。这不需要通过队列,一位共享内存值可以正常工作。
于 2012-05-20T01:19:02.547 回答