2

在使用多处理模块时,我遇到了一个常见的问题,即 pickle 错误。

我的确切问题是我需要在函数中调用它之前给我正在调用的函数提供一些状态pool.map,但这样做会导致attribute lookup __builtin__.function failed错误 found here

根据链接的 SO 答案,看起来使用函数的唯一方法pool.map是调用定义的函数本身,以便在当前函数的范围之外查找它。

我觉得我对上面的解释很差,所以这是代码中的问题。:)

无池测试

# Function to be called by the multiprocessing pool
def my_func(x):
    massive_list, medium_list, index1, index2 = x
    result = [massive_list[index1 + x][index2:] for x in xrange(10)]
    return result in medium_list



if __name__ == '__main__':

    data = [comprehension which loads a ton of state]
    source = [comprehension which also loads a medium amount of state]

    for num in range(100):
        to_crunch = ((massive_list, small_list, num, x) for x in range(1000)) 
        result = map(my_func, to_crunch)

这工作正常,正如预期的那样。它唯一的“错误”是它很慢。

池尝试 1

# (Note: my_func() remains the same)
if __name__ == '__main__':

    data = [comprehension which loads a ton of state]
    source = [comprehension which also loads a medium amount of state]

    pool = multiprocessing.Pool(2)
    for num in range(100):
        to_crunch = ((massive_list, small_list, num, x) for x in range(1000)) 
        result = pool.map(my_func, to_crunch)

这在技术上是可行的,但速度慢了惊人的 18 倍!速度变慢不仅来自于在每次调用中复制两个海量数据结构,而且还来自于在它们传递时对它们进行腌制/解封。非池版本受益于只需将引用传递给大量列表,而不是实际列表。

因此,在找到瓶颈后,我尝试将两个庞大的列表存储为my_func. 这样,如果我理解正确,每个工人只需要复制一次(在我的例子中,4)。

池尝试 2:

我结束了my_func一个闭包,将两个列表作为存储状态传递。

def build_myfunc(m,s):
    def my_func(x):
        massive_list = m # close the state in there
        small_list = s

        index1, index2 = x
        result = [massive_list[index1 + x][index2:] for x in xrange(10)]
        return result in medium_list
    return my_func

if __name__ == '__main__':

    data = [comprehension which loads a ton of state]
    source = [comprehension which also loads a medium amount of state]

    modified_func = build_myfunc(data, source)

    pool = multiprocessing.Pool(2)
    for num in range(100):
        to_crunch = ((massive_list, small_list, num, x) for x in range(1000)) 
        result = pool.map(modified_func, to_crunch)

但是,这会返回 pickle 错误,因为(基于上面链接的 SO 问题)您不能从同一范围内调用具有多处理功能的函数。

错误:

PicklingError: Can't pickle <type 'function'>: attribute lookup __builtin__.function failed

那么,有没有办法解决这个问题?

4

1 回答 1

0

Map 是一种分配工作负载的方法。如果您将数据存储在 func 中,我认为您将失去最初的目的。

让我们试着找出它变慢的原因。这不正常,肯定有别的原因。

首先,进程的数量必须适合运行它们的机器。在您的示例中,您使用的是一个由 2 个进程组成的池,因此总共涉及 3 个进程。您正在使用的系统上有多少个内核?还有什么在运行?处理数据时的系统负载是多少?该函数对数据有什么作用?它访问磁盘吗?或者它可能使用 DB,这意味着可能有另一个进程访问磁盘和内核。内存呢?存储初始列表是否足够?

正确的实施是您的尝试 1。

尝试使用iostat例如分析执行。这样您就可以发现瓶颈。

如果它在 cpu 上停止,那么您可以尝试对代码进行一些调整。

从Stackoverflow 上的另一个答案(我所以没问题复制并粘贴到这里:P):

您正在使用.map()which 收集结果然后返回。因此,对于大型数据集,您可能会陷入收集阶段。

您可以尝试使用.imap()which is the iterator version on.map() 甚至if 结果的.imap_unordered() 顺序并不重要(从您的示例中可以看出)。

是相关文档。值得注意的是:

对于很长的迭代,使用较大的 chunksize 值可以使作业完成比使用默认值 1 快得多。

于 2013-10-11T09:28:10.247 回答