6

我正在分割大型 ctype 数组并并行处理它们。我收到下面的错误并相信它,因为数组的一部分正在完成另一部分的处理。我尝试使用 process.join() 让第一组进程等待,但这不起作用。想法?

Exception RuntimeError: RuntimeError('cannot join current thread',) in <Finalize object, dead> ignored

使用:

    ....

        with closing(multiprocessing.Pool(initializer=init(array))) as p:
            del array #Since the array is now stored in a shared array destroy the array ref for memory reasons

            step = y // cores
            if step != 0:
                jobs =[]
                for i in range (0, y, step):
                    process = p.Process(target=stretch, args= (shared_arr,slice(i, i+step)),kwargs=options)
                    jobs.append(process)
                    process.start()

                for j in jobs:
                    j.join()

    del jobs
    del process

更新:

 #Create an ctypes array
        array = ArrayConvert.SharedMemArray(array)
        #Create a global of options
        init_options(options) #options is a dict
        with closing(multiprocessing.Pool(initializer=init(array))) as p:
            del array #Since the array is not stored in a shared array destroy the array ref for memory reasons


            step = y // cores
            if step != 0:
                for i in range (0, y, step):
                    #Package all the options into a global dictionary

                    p.map_async(stretch,[slice(i, i+step)])

                    #p.apply_async(stretch,args=(shared_arr,slice(i, i+step)),kwargs=options)

        p.join()        

def init_options(options_):
    global kwoptions
    kwoptions = options_

我传递给 map_async 的函数存储在不同的模块中,因此我很难将全局 kwoptions 传递给该函数。在这样的模块之间传递全局变量似乎不正确(unpythonic)。这是能够通过 map_async 传递 kwargs 的方式吗?

我应该使用不同的东西(应用或处理)重新处理多处理吗?

4

2 回答 2

2

所以我通过重新编写代码并删除池来完成这项工作(根据 JF Sebastian 的评论)。

在伪代码中:

initialize the shared array
determine step size
create an empty list of jobs
create the process, pass it the kwargs, and append it to the job list
start the jobs
join the jobs

如果这对任何谷歌人有帮助,这里是代码:

#Initialize the ctypes array
        init(array)
        #Remove the reference to the array (to preserve memory on multiple iterations.
        del array

        step = y // cores
        jobs = []
        if step != 0:
            for i in range(0,y,step):        
                p = multiprocessing.Process(target=stretch,args= (shared_arr,slice(i, i+step)),kwargs=options)
                jobs.append(p)

            for job in jobs:
                job.start()
            for job in jobs:
                job.join()
于 2012-08-08T18:14:31.873 回答
1

initializerPool()接受函数的参数;替换initializer=init(array)initializer=init, initargs=(array,)

要将关键字参数传递给与family 一起f()使用的函数,pool.*map*您可以创建一个包装器mp_f()

#!/usr/bin/env python
import logging
import multiprocessing as mp
from contextlib import closing

def init(shared_array_):
    # globals that should be available in worker processes should be
    # initialized here
    global shared_array
    shared_array = shared_array_

def f(interval, a=None, b=None):
    mp.get_logger().info("interval=%r, a=%r, b=%r" % (interval, a, b))
    shared_array[interval] = [a + interval.start]*b # fake computations

def mp_f(arg_kwargs):
    try:
        arg, kwargs = arg_kwargs
        return f(arg, **kwargs) # pass keyword args to f()
    except Exception:
        mp.get_logger().error("f%r failed" % (arg_kwargs,))

def main():
    mp.log_to_stderr().setLevel(logging.INFO)

    N = 10**6
    array = mp.RawArray('i', N) # create shared array

    # create workers pool; use all available CPU cores
    with closing(mp.Pool(initializer=init, initargs=(array,))) as p:
        options = dict(a=5, b=N//4) # dummy options
        step = options['b']
        args = ((slice(i, i+step), options) for i in range(0, N, step))
        for _ in p.imap_unordered(mp_f, args): # submit jobs
            pass
    p.join()
    mp.get_logger().info(array[::step])

if __name__=="__main__":
    mp.freeze_support() # for py2exe and the-like on Windows
    main()
于 2012-08-09T10:28:27.683 回答