97

我有一个 60GB 的 SciPy 数组(矩阵),我必须在 5 个以上的multiprocessing Process对象之间共享。我看过 numpy-sharedmem 并在 SciPy 列表上阅读了这个讨论。似乎有两种方法numpy-sharedmem——使用 amultiprocessing.RawArray()并将 NumPy dtypes 映射到ctypes。现在,numpy-sharedmem似乎是要走的路,但我还没有看到一个好的参考示例。我不需要任何类型的锁,因为数组(实际上是矩阵)将是只读的。现在,由于它的大小,我想避免复制。听起来正确的方法是将数组的唯一副本创建为数组sharedmem然后将其传递给Process对象?几个具体问题:

  1. 将sharedmem 句柄实际传递给sub- Process()es 的最佳方式是什么?我需要一个队列来传递一个数组吗?管道会更好吗?我可以将它作为参数传递给Process()子类的 init(我假设它是腌制的)吗?

  2. 在我上面链接的讨论中,提到numpy-sharedmem不是 64 位安全的?我肯定使用了一些不可 32 位寻址的结构。

  3. 这种方法有权衡RawArray()吗?更慢,更麻烦?

  4. numpy-sharedmem 方法是否需要任何 ctype-to-dtype 映射?

  5. 有没有人有这样做的一些开源代码的例子?我是一个非常动手学习的人,如果没有任何好的例子可以看,很难让它发挥作用。

如果我可以提供任何其他信息来帮助其他人澄清这一点,请发表评论,我会添加。谢谢!

这需要在 Ubuntu Linux 和Maybe Mac OS 上运行,但可移植性并不是一个大问题。

4

6 回答 6

43

如果您使用的是 Linux(或任何符合 POSIX 的系统),则可以将此数组定义为全局变量。当它启动一个新的子进程时在 Linuxmultiprocessing上使用。fork()新生成的子进程会自动与其父进程共享内存,只要它不更改它(写时复制机制)。

既然你说“我不需要任何类型的锁,因为数组(实际上是一个矩阵)将是只读的”利用这种行为将是一种非常简单但非常有效的方法:所有子进程都将访问读取这个大型 numpy 数组时,物理内存中的相同数据。

不要将您的数组交给Process()构造函数,这会将数据指示multiprocessingpickle孩子,这在您的情况下效率极低或不可能。在 Linux 上,紧随其后fork()的子节点是使用相同物理内存的父节点的精确副本,因此您需要做的就是确保“包含”矩阵的 Python 变量可以从target您移交给的函数中访问Process()。这通常可以通过“全局”变量来实现。

示例代码:

from multiprocessing import Process
from numpy import random


global_array = random.random(10**4)


def child():
    print sum(global_array)


def main():
    processes = [Process(target=child) for _ in xrange(10)]
    for p in processes:
        p.start()
    for p in processes:
        p.join()


if __name__ == "__main__":
    main()

在不支持的 Windowsfork()multiprocessing使用 win32 API 调用CreateProcess。它从任何给定的可执行文件创建一个全新的进程。这就是为什么在 Windows 上,如果需要在父项运行时创建的数据,则需要将数据腌制给子项

于 2013-07-22T11:29:14.593 回答
31

@Velimir Mlaker 给出了一个很好的答案。我想我可以添加一些评论和一个小例子。

(我在 sharedmem 上找不到太多文档——这些是我自己实验的结果。)

  1. 您是否需要在子进程启动时或启动后传递句柄?如果只是前者,您可以只使用targetargs参数Process。这可能比使用全局变量更好。
  2. 从您链接的讨论页面中,似乎不久前将对 64 位 Linux 的支持添加到 sharedmem 中,因此这可能不是问题。
  3. 我不知道这个。
  4. 否。请参阅下面的示例。

例子

#!/usr/bin/env python
from multiprocessing import Process
import sharedmem
import numpy

def do_work(data, start):
    data[start] = 0;

def split_work(num):
    n = 20
    width  = n/num
    shared = sharedmem.empty(n)
    shared[:] = numpy.random.rand(1, n)[0]
    print "values are %s" % shared

    processes = [Process(target=do_work, args=(shared, i*width)) for i in xrange(num)]
    for p in processes:
        p.start()
    for p in processes:
        p.join()

    print "values are %s" % shared
    print "type is %s" % type(shared[0])

if __name__ == '__main__':
    split_work(4)

输出

values are [ 0.81397784  0.59667692  0.10761908  0.6736734   0.46349645  0.98340718
  0.44056863  0.10701816  0.67167752  0.29158274  0.22242552  0.14273156
  0.34912309  0.43812636  0.58484507  0.81697513  0.57758441  0.4284959
  0.7292129   0.06063283]
values are [ 0.          0.59667692  0.10761908  0.6736734   0.46349645  0.
  0.44056863  0.10701816  0.67167752  0.29158274  0.          0.14273156
  0.34912309  0.43812636  0.58484507  0.          0.57758441  0.4284959
  0.7292129   0.06063283]
type is <type 'numpy.float64'>

这个相关的问题可能很有用。

于 2013-07-30T15:54:18.030 回答
25

你可能对我写的一小段代码感兴趣:github.com/vmlaker/benchmark-sharedmem

唯一感兴趣的文件是main.py. 它是numpy-sharedmem的基准——代码只是通过 Pipe 将数组(或numpysharedmem)传递给生成的进程。工人只是调用sum()数据。我只对比较两种实现之间的数据通信时间感兴趣。

我还写了另一个更复杂的代码:github.com/vmlaker/sherlock

在这里,我使用numpy-sharedmem模块通过 OpenCV 进行实时图像处理——根据 OpenCV 的更新cv2API,图像是 NumPy 数组。图像,实际上是其中的引用,通过创建的字典对象在进程之间共享multiprocessing.Manager(与使用队列或管道相反)。与使用普通 NumPy 数组相比,我得到了很大的性能改进。

管道与队列

根据我的经验,使用 Pipe 的 IPC 比 Queue 更快。这是有道理的,因为 Queue 添加了锁定以确保多个生产者/消费者的安全。管道没有。但是,如果您只有两个进程来回交谈,那么使用 Pipe 是安全的,或者,正如文档所述:

...同时使用管道不同端的进程不存在损坏风险。

sharedmem安全

模块的主要问题sharedmem是在程序不正常退出时内存泄漏的可能性。这在此处进行了冗长的讨论。尽管在 2011 年 4 月 10 日,Sturla 提到了内存泄漏的修复,但从那以后我仍然遇到了泄漏,使用两个 repos,Sturla Molden 自己在 GitHub ( github.com/sturlamolden/sharedmem-numpy ) 和 Chris Lee-Messer 在 Bitbucket ( bitbucket.org/cleemesser/numpy-sharedmem)。

于 2013-07-28T21:45:21.810 回答
16

如果你的数组那么大,你可以使用numpy.memmap. 例如,如果您有一个存储在磁盘中的数组,例如'test.array',即使在“写入”模式下,您也可以使用同时进程来访问其中的数据,但是您的情况更简单,因为您只需要“读取”模式。

创建数组:

a = np.memmap('test.array', dtype='float32', mode='w+', shape=(100000,1000))

然后,您可以使用与普通数组相同的方式填充此数组。例如:

a[:10,:100]=1.
a[10:,100:]=2.

删除变量时,数据将存储到磁盘中a

稍后您可以使用多个进程来访问以下数据test.array

# read-only mode
b = np.memmap('test.array', dtype='float32', mode='r', shape=(100000,1000))

# read and writing mode
c = np.memmap('test.array', dtype='float32', mode='r+', shape=(100000,1000))

相关答案:

于 2013-07-26T11:26:30.080 回答
3

您可能还会发现查看pyro的文档很有用,就好像您可以适当地对任务进行分区一样,您可以使用它在不同的机器上以及在同一台机器的不同内核上执行不同的部分。

于 2013-07-31T05:39:20.120 回答
0

为什么不使用多线程?主进程的资源可以被其线程本地共享,因此多线程显然是共享主进程拥有的对象的更好方法。

如果你担心 python 的 GIL 机制,也许你可以求助于nogil.numba

于 2020-07-31T04:11:06.460 回答