15

我正在使用 concurrent.futures 来实现多处理。我收到 queue.Full 错误,这很奇怪,因为我只分配了 10 个作业。

A_list = [np.random.rand(2000, 2000) for i in range(10)]

with ProcessPoolExecutor() as pool:
    pool.map(np.linalg.svd, A_list)

错误:

Exception in thread Thread-9:
Traceback (most recent call last):
  File "/Library/Frameworks/Python.framework/Versions/3.4/lib/python3.4/threading.py", line 921, in _bootstrap_inner
    self.run()
  File "/Library/Frameworks/Python.framework/Versions/3.4/lib/python3.4/threading.py", line 869, in run
    self._target(*self._args, **self._kwargs)
  File "/Library/Frameworks/Python.framework/Versions/3.4/lib/python3.4/concurrent/futures/process.py", line 251, in _queue_management_worker
    shutdown_worker()
  File "/Library/Frameworks/Python.framework/Versions/3.4/lib/python3.4/concurrent/futures/process.py", line 209, in shutdown_worker
    call_queue.put_nowait(None)
  File "/Library/Frameworks/Python.framework/Versions/3.4/lib/python3.4/multiprocessing/queues.py", line 131, in put_nowait
    return self.put(obj, False)
  File "/Library/Frameworks/Python.framework/Versions/3.4/lib/python3.4/multiprocessing/queues.py", line 82, in put
    raise Full
queue.Full
4

2 回答 2

20

简短的回答
我相信管道尺寸限制是根本原因。除了将数据分解成更小的块并迭代处理它们之外,您对此无能为力。这意味着您可能需要找到一种新算法,该算法一次可以处理 2000x2000 数组的一小部分,以找到奇异值组合。

详细信息
让我们马上得到一件事:您正在处理大量信息。仅仅因为您只使用十个项目并不意味着它是微不足道的。这些项目中的每一个都是一个 2000x2000 数组,其中包含 4,000,000 个浮点数,每个浮点数通常为 64 位,因此您正在查看每个数组大约 244MB,以及在Numpy 的 ndarrays中标记的其他数据。

ProcessPoolExecutor通过启动一个单独的线程来管理工作进程。管理线程使用multiprocesing.Queue将作业传递给工作人员,称为_call_queue. 这些multiprocessing.Queues 实际上只是围绕管道的精美包装,并且您尝试传递给工作人员的 ndarray 可能太大而管道无法正确处理。

阅读Python 问题 8426表明,即使您可以为您的操作系统查找一些标称管道大小限制,也很难准确计算出您的管道有多大。有太多变数使它变得简单。即使将事物从队列中拉出的顺序也会在底层管道中引发竞争条件,从而触发奇怪的错误。

我怀疑您的一名工作人员正在从其 中获取不完整或损坏的对象_call_queue,因为该队列的管道中充满了您的巨型对象。该工作人员以不干净的方式死亡,工作队列管理器检测到此故障,因此它放弃工作并告诉剩余的工作人员退出。但是它通过将毒丸传递过来来做到_call_queue这一点,毒丸仍然充满了你的巨大ndarrays。这就是你得到完整队列异常的原因 - 你的数据填满了队列,然后管理线程尝试使用同一个队列将控制消息传递给其他工作人员。

我认为这是一个典型的例子,说明了在程序中不同实体之间混合数据和控制流的潜在危险。您的大数据不仅阻止了工人接收更多数据,还阻止了经理与工人的控制通信,因为他们使用相同的路径。

我无法重现你的失败,所以我不能确定所有这些都是正确的。但是,您可以使此代码与 200x200 数组 (~2.5MB) 一起工作,这一事实似乎支持了这一理论。标称管道大小限制似乎以 KB 或最多几 MB 为单位,具体取决于操作系统和体系结构。如此大量的数据可以通过管道这一事实并不令人惊讶,尤其是当您考虑到如果消费者持续接收数据时,并非所有 2.5MB 都需要立即真正放入管道中。它建议您可以通过管道连续获取的数据量的合理上限。

于 2015-12-02T06:02:01.573 回答
11

我最近在调试一个通过管道发送各种 GB 数据的 python3.6 程序时偶然发现了这一点。这就是我发现的(希望它可以节省别人的时间!)。

就像skrrgwasme所说,如果队列管理器在发送毒丸时无法获取信号量,则会引发队列已满错误。对信号量的获取调用是非阻塞的,它会导致管理器失败(由于数据和控制流共享相同的Queue ,它无法发送“控制”命令)。请注意,上面的链接是指 python 3.6.0

现在我想知道为什么我的队列管理器会发送毒丸。一定有其他的失败!显然发生了一些异常(在其他一些子进程中?在父进程中?),队列管理器试图清理并关闭所有子进程。在这一点上,我有兴趣找到这个根本原因。

调试根本原因

我最初尝试在子进程中记录所有异常,但显然那里没有发生显式错误。从问题 3895开始:

请注意,当结果在 unpickle 时失败时, multiprocessing.Pool 也会被破坏。

似乎多处理模块在 py36 中被破坏,因为它不会正确捕获和处理序列化错误。

不幸的是,由于时间限制,我没有设法自己复制和验证问题,而是更愿意跳到行动点和更好的编程实践(不要通过管道发送所有数据:)。这里有几个想法:

  1. 尝试腌制应该通过管道运行的数据。由于我的数据(数百 GB)的巨大性质和时间限制,我无法找到哪些记录是不可序列化的。
  2. 将调试器放入 python3.6 并打印原始异常。

动作要点

  1. 如果可能的话,重构你的程序以减少通过管道发送的数据。

  2. 阅读问题 3895后,问题似乎与酸洗错误有关。另一种选择(和良好的编程实践)可能是使用不同的方式传输数据。例如,可以让子进程写入文件并将路径返回到父进程(这只是一个小字符串,可能是几个字节)。

  3. 等待未来的 python 版本。显然,在问题 3895的上下文中,这已在 python 版本标签 v3.7.0b3 上得到修复。Full异常将在shutdown_worker内部处理。在撰写本文时,Python 的当前维护版本是 3.6.5

于 2018-06-24T04:08:33.257 回答