0

我有一些代码可以使用多处理库的 pathos 扩展来执行一些操作。我的问题是如何使用更复杂的工作函数——在本例中名为New_PP. 我应该如何格式化 thpool 行来处理我的工作函数需要的字典才能给我一个结果。access_dictPython 将字典默认为全局变量,但在工作函数的范围内,我收到与找不到此字典 ( ) 相关的错误,所以我如何发送字典或确保它对我的工作线程可用。

    Nchunks = 10
    thpool = pathos.multiprocessing.ThreadingPool()
    mppool = pathos.multiprocessing.ProcessingPool()
    Lchunk = int(len(readinfiles) / Nchunks)
    filechunks = chunks(readinfiles, 10)
    for fnames in filechunks:
            files = (open(name, 'r') for name in fnames)
            res = thpool.map(mppool.map, [New_PP]*len(fnames), files)
            print res[0][0]

和工人功能:

def New_PP(line):
    split_line = line.rstrip()
    if len(split_line) > 1:
      access_dict[4] ....

工人函数如何到达access_dict

我还尝试将我的函数包装在一个类中,如下所示:

class MAPPP:
    def New_PP(self, line):
        self.mytype = access_dict
        return my_type

    def __init__(self, value_dict):
        self.access_dict = access_dict

和:

mapp = MAPPP(value_dict)
print mapp.value_dict
res = thpool.map(mppool.map, [mapp.New_PP]*len(fnames), files)

但是我遇到了同样的问题。

4

1 回答 1

0

这里有几个问题:

  1. 你上面的代码有一堆错误/错别字。

  2. 当您发送时mapp.New_PP,它会复制mapp.New_PP... 因此它不会access_dict在实例之间共享,因为这些实例是在不同处理器上的不同解释器会话中创建和销毁的。

也许下面会更清楚地展示......

>>> class MAPPP(object):
...   access_dict = {}
...   def __init__(self, value_dict):
...     MAPPP.access_dict.update(value_dict)
...     return
...   def New_PP(self, line):
...     MAPPP.access_dict[line] = len(line)
...     return len(line)
... 
>>> 
>>> mapp = MAPPP({})
>>> mapp.access_dict
{}
>>> import pathos
>>> thpool = pathos.multiprocessing.ThreadingPool()
>>> mppool = pathos.multiprocessing.ProcessingPool()
>>> fnames = ['foo.txt', 'bar.txt']
>>> files = (open(name, 'r') for name in fnames)
>>> res = thpool.map(mppool.map, [mapp.New_PP]*len(fnames), files)
>>> res
[[21, 21, 21, 21, 21, 21, 20, 21, 19], [17, 18, 17, 17, 50, 82, 98]]
>>> mapp.access_dict
{}

所以发生了什么事?文件被逐行读取……并计算每行的长度……并返回到主进程。但是,行和长度的写入并没有添加到mapp.access_dict属于mapp主进程的实例中……那是因为mapp没有传递给其他线程和处理器……它被复制了。所以,它确实起作用了......并且这些行被添加到类字典的相关副本中......但是当进程/线程完成它的工作并将行号传递回map然后关闭时,它们被垃圾收集。

pathos现在或multiprocessing现在没有“超级简单”的方法可以做到这一点。但是,如果您使用multiprocessing和,您可以做到这一点ctypes

您可能想看看multiprocessing使用共享内存和/或代理:

作为pathos作者,我计划使功能更高级……但目前还没有时间表。

于 2015-02-27T16:43:25.403 回答