9

我有一个使用 64 位 Python 3.3.0 CPython 解释器在 64 位 Linux(内核版本 2.6.28.4)机器上运行的自定义模拟器(用于生物学)。

因为模拟器依赖于许多独立实验以获得有效结果,所以我内置了并行处理来运行实验。线程之间的通信主要发生在使用托管 multiprocessing Queues ( doc ) 的生产者-消费者模式下。该架构的概要如下:

  • 处理生成和管理Processes 和各种Queues的主进程
  • N 个进行模拟的工作进程
  • 1 个结果消费者进程,消耗模拟结果并对结果进行排序和分析

主进程和工作进程通过输入进行通信Queue。类似地,工作进程将他们的结果放在一个输出Queue中,结果消费者进程从中消费项目。最终的 ResultConsumer 对象通过multiprocessing Pipe ( doc ) 传递回主进程。

一切正常,直到它尝试通过以下方式将 ResultConsumer 对象传递回主进程Pipe

Traceback (most recent call last):
  File "/home/cmccorma/.local/lib/python3.3/multiprocessing/process.py", line 258, in _bootstrap
    self.run()
  File "/home/cmccorma/.local/lib/python3.3/multiprocessing/process.py", line 95, in run
    self._target(*self._args, **self._kwargs)
  File "DomainArchitectureGenerator.py", line 93, in ResultsConsumerHandler
    pipeConn.send(resCon)
  File "/home/cmccorma/.local/lib/python3.3/multiprocessing/connection.py", line 207, in send
    self._send_bytes(buf.getbuffer())
  File "/home/cmccorma/.local/lib/python3.3/multiprocessing/connection.py", line 394, in _send_bytes
    self._send(struct.pack("!i", n))
struct.error: 'i' format requires -2147483648 <= number <= 2147483647

我了解前两个跟踪(库中未处理的退出Process),第三个是我用于将 ResultConsumer 对象发送 Pipe到主进程的代码行。最后两条痕迹是有趣的地方。A Pipepickle 发送给它的任何对象,并将生成的字节传递到另一端(匹配连接),在运行时它被 unpickle recv()self._send_bytes(buf.getbuffer())正在尝试发送腌制对象的字节。 self._send(struct.pack("!i", n))正在尝试使用长度为 n 的整数(网络/大端序)打包结构,其中 n 是作为参数传入的缓冲区的长度(该struct 库处理 Python 值和表示为 Python 字符串的 C 结构之间的转换,请参阅文档)。

此错误仅在尝试大量实验时发生,例如 10 次实验不会导致它,但 1000 会始终如一(所有其他参数保持不变)。到目前为止,我最好的假设struct.error是,试图向下推送管道的字节数超过 2^32-1 (2147483647) 或 ~2 GB。

所以我的问题有两个:

  1. 我的调查陷入困境,因为struct.py基本上只是从进口_struct,我不知道那在哪里。

  2. 考虑到底层架构都是 64 位的,字节限制似乎是任意的。那么,为什么我不能通过比这更大的东西呢?此外,如果我不能改变这一点,这个问题有什么好的(阅读:简单的)解决方法吗?

注意:我认为使用 aQueue代替 aPipe不能解决问题,因为我怀疑Queue使用了类似的酸洗中间步骤。 编辑:正如 abarnert 的回答中指出的那样,这个注释是完全不正确的。

4

1 回答 1

11

我的调查陷入困境,因为 struct.py 基本上只是从 _struct 导入,我不知道那在哪里。

在 CPython 中,是从源代码树的目录中_struct构建的 C 扩展模块。您可以在此处在线找到代码。_struct.cModules

无论何时,foo.pyimport _foo几乎总是一个 C 扩展模块,通常是从_foo.c. 如果你根本找不到foo.py,它可能是一个 C 扩展模块,由_foomodule.c.

即使您不使用 PyPy,通常也值得查看等效的 PyPy 源代码。他们用纯 Python 重新实现了几乎所有的扩展模块——对于其余的(包括这种情况),底层的“扩展语言”是 RPython,而不是 C。

struct但是,在这种情况下,除了文档中的内容之外,您无需了解有关如何工作的任何信息。


考虑到底层架构都是 64 位的,字节限制似乎是任意的。

查看它调用的代码:

self._send(struct.pack("!i", n))

如果您查看文档'i'格式字符明确表示“4 字节 C 整数”,而不是“无论ssize_t是什么”。为此,您必须使用'n'. 或者您可能想要明确地使用 long long, with 'q'

你可以通过monkeypatchmultiprocessing来使用struct.pack('!q', n)。或'!q'。或以除struct. 当然,这会破坏与 non-patched 的兼容性,multiprocessing如果您尝试跨多台计算机或其他东西进行分布式处理,这可能是一个问题。但这应该很简单:

def _send_bytes(self, buf):
    # For wire compatibility with 3.2 and lower
    n = len(buf)
    self._send(struct.pack("!q", n)) # was !i
    # The condition is necessary to avoid "broken pipe" errors
    # when sending a 0-length buffer if the other end closed the pipe.
    if n > 0:
        self._send(buf)

def _recv_bytes(self, maxsize=None):
    buf = self._recv(8) # was 4
    size, = struct.unpack("!q", buf.getvalue()) # was !i
    if maxsize is not None and size > maxsize:
        return None
    return self._recv(size)

当然,不能保证这种改变就足够了。您需要通读周围的其余代码并对其进行测试。


注意:我怀疑使用 aQueue代替 aPipe不能解决问题,因为我怀疑Queue使用了类似的酸洗中间步骤。

好吧,这个问题与酸洗无关。Pipepickle用于发送长度,它使用struct. 您可以验证pickle不会有这个问题:pickle.loads(pickle.dumps(1<<100)) == 1<<100将返回True.

(在早期版本中,大型物体pickle list存在问题 - 例如, 2G 元素 - 这可能会导致问题的规模大约是您当前击中的对象的 8 倍。但这已在 3.3 中得到修复。)

同时……与其通过挖掘源代码来确定它是否有效,不如尝试一下并查看是否会更快?


另外,你确定你真的想通过隐式酸洗来传递一个 2GB 的数据结构吗?

如果我正在做一些缓慢且需要大量内存的事情,我更愿意明确说明——例如,pickle 到一个临时文件并发送路径或 fd。(如果您使用numpyorpandas或其他东西,请使用其二进制文件格式而不是pickle,但同样的想法。)

或者,更好的是,共享数据。是的,可变的共享状态是不好的……但是共享不可变的对象很好。无论你有 2GB 的大小,你能把它放在 a 中,或者放在multiprocessing.Array一个ctypes数组或结构中(数组或结构...... ? 有一些额外的代码来定义和分离结构,但是当好处可能如此之大时,值得一试。multiprocessing.sharedctypesctypesfilemmap


最后,当您认为在 Python 中发现了错误/明显缺失的功能/不合理的限制时,值得查看错误跟踪器。看起来像问题 17560:对非常大的对象使用多处理的问题?正是您的问题,并且有很多信息,包括建议的解决方法。

于 2013-05-15T23:08:44.290 回答