19

我有大量自定义对象,我需要对其执行独立(可并行)任务,包括修改对象参数。我尝试过同时使用 Manager().dict 和“sharedmemory”,但两者都不起作用。例如:

import numpy as np
import multiprocessing as mp
import sharedmem as shm


class Tester:

    num = 0.0
    name = 'none'
    def __init__(self,tnum=num, tname=name):
        self.num  = tnum
        self.name = tname

    def __str__(self):
        return '%f %s' % (self.num, self.name)

def mod(test, nn):
    test.num = np.random.randn()
    test.name = nn


if __name__ == '__main__':

    num = 10

    tests = np.empty(num, dtype=object)
    for it in range(num):
        tests[it] = Tester(tnum=it*1.0)

    sh_tests = shm.empty(num, dtype=object)
    for it in range(num):
        sh_tests[it] = tests[it]
        print sh_tests[it]

    print '\n'
    workers = [ mp.Process(target=mod, args=(test, 'some') ) for test in sh_tests ]

    for work in workers: work.start()

    for work in workers: work.join()

    for test in sh_tests: print test

打印出来:

0.000000 none
1.000000 none
2.000000 none
3.000000 none
4.000000 none
5.000000 none
6.000000 none
7.000000 none
8.000000 none
9.000000 none


0.000000 none
1.000000 none
2.000000 none
3.000000 none
4.000000 none
5.000000 none
6.000000 none
7.000000 none
8.000000 none
9.000000 none

即对象没有被修改。

我怎样才能达到预期的行为?

4

4 回答 4

19

问题在于,当对象被传递给工作进程时,它们会被pickle打包,然后运送到另一个进程,在那里它们被解包并处理。您的对象并没有像克隆的那样传递给其他进程。您不返回对象,因此克隆的对象被愉快地修改,然后被丢弃。

看起来这不能直接完成(Python:可能在 2 个单独的进程之间共享内存数据)。

您可以做的是返回修改后的对象。

import numpy as np
import multiprocessing as mp



class Tester:

    num = 0.0
    name = 'none'
    def __init__(self,tnum=num, tname=name):
        self.num  = tnum
        self.name = tname

    def __str__(self):
        return '%f %s' % (self.num, self.name)

def mod(test, nn, out_queue):
    print test.num
    test.num = np.random.randn()
    print test.num
    test.name = nn
    out_queue.put(test)




if __name__ == '__main__':       
    num = 10
    out_queue = mp.Queue()
    tests = np.empty(num, dtype=object)
    for it in range(num):
        tests[it] = Tester(tnum=it*1.0)


    print '\n'
    workers = [ mp.Process(target=mod, args=(test, 'some', out_queue) ) for test in tests ]

    for work in workers: work.start()

    for work in workers: work.join()

    res_lst = []
    for j in range(len(workers)):
        res_lst.append(out_queue.get())

    for test in res_lst: print test

这确实导致了一个有趣的观察,即因为生成的进程是相同的,它们都以相同的随机数种子开始,因此它们生成相同的“随机”数。

于 2013-04-07T03:24:43.543 回答
5

您的代码不会尝试修改共享内存。它只是克隆单个对象。

dtype=object表示由于@tcaswell 提供的链接sharedmem中列出的原因而无法工作:

共享包含指向其他对象的引用/指针的对象图基本上是不可行的

对于可以使用共享内存的普通(值)类型,请参阅在共享内存中使用 numpy 数组进行多处理

manager方法也应该有效(它只是复制周围的对象):

import random
from multiprocessing import Pool, Manager

class Tester(object):
    def __init__(self, num=0.0, name='none'):
        self.num  = num
        self.name = name

    def __repr__(self):
        return '%s(%r, %r)' % (self.__class__.__name__, self.num, self.name)

def init(L):
    global tests
    tests = L

def modify(i_t_nn):
    i, t, nn = i_t_nn
    t.num += random.normalvariate(mu=0, sigma=1) # modify private copy
    t.name = nn
    tests[i] = t # copy back
    return i

def main():
    num_processes = num = 10 #note: num_processes and num may differ
    manager = Manager()
    tests = manager.list([Tester(num=i) for i in range(num)])
    print(tests[:2])

    args = ((i, t, 'some') for i, t in enumerate(tests))
    pool = Pool(processes=num_processes, initializer=init, initargs=(tests,))
    for i in pool.imap_unordered(modify, args):
        print("done %d" % i)
    pool.close()
    pool.join()
    print(tests[:2])

if __name__ == '__main__':
    main()
于 2013-04-07T04:35:06.603 回答
4

我没有看到您将 shm 引用传递给子进程,所以我看不到它们所做的工作如何被写回共享内存。也许我在这里遗漏了一些东西。

或者,您是否考虑过 numpy.memmap?(顺便说一句:tcaswell,这里提到的模块似乎是:numpy-sharedmem)。

此外,您可能还想阅读 Sturla Molden 的Using Python, multiprocessing and NumPy/SciPy for parallel numeric Computing (PDF),如 unutbu 对 [StackOverflow:How do I pass large numpy arrays between python subprocesses without save to disk?] 的回答中所推荐的,以及(如何在 python 子进程之间传递大型 numpy 数组而不保存到磁盘?)。和 Joe Kington 的StackOverflow:NumPy 与多处理和 mmap

这些可能比直接相关更具启发性。

于 2013-04-07T04:27:11.350 回答
4

因为你不能在进程之间共享 Python 对象,multiprocessing所以如果你有重要的对象,任何使用的实现都将是低效的,因为你必须复制对象才能共享数据。

如果你愿意尝试不同的方法,你可以试试 Ray ( docs )!它是一个可以轻松编写并行和分布式 Python 的框架。简而言之,它使您能够并行启动 Python 函数,类似于multiprocessing,但它也更加灵活,因为 Ray 进程可以共享内存。这是您用 Ray 编写的脚本,使用“参与者”(共享对象)的概念:

# You can install Ray with pip.
import ray

import numpy as np


# Add this line to signify that you want to share Tester objects
# (called "actors" in Ray) between processes.
@ray.remote
class Tester(object):

    num = 0.0
    name = 'none'
    def __init__(self,tnum=num, tname=name):
        self.num  = tnum
        self.name = tname

    def __str__(self):
        return '%f %s' % (self.num, self.name)

    # Convert mod to be a method of the Tester object.
    def mod(self, nn):
        self.num = np.random.randn()
        self.name = nn


if __name__ == '__main__':

    # Start Ray. This allows you to create shared Testers (called "actors").
    ray.init()

    num = 10

    tests = np.empty(num, dtype=object)
    for it in range(num):
        # Create a shared Tester object (an "actor").
        tests[it] = Tester.remote(tnum=it*1.0)

    # Do some parallel work.
    for test in tests:
        test.mod.remote('some')

    # Compute the __str__ representations of each Tester in parallel.
    test_str_futures = [test.__str__.remote() for test in tests]
    # Get and print the __str__ return values. `ray.get` will block
    # until the return values are ready.
    test_strs = ray.get(test_str_futures)
    for test_str in test_strs:
        print(test_str)
于 2019-02-07T00:09:50.610 回答