1

我在我的 python 代码中使用多处理来异步运行一个函数:

import multiprocessing

po = multiprocessing.Pool()
for elements in a_list:
    results.append(po.apply_async(my_module.my_function, (some_arguments, elements, a_big_argument)))               
po.close()
po.join()
for r in results:
    a_new_list.add(r.get())

a_big_argument是一本字典。我把它作为一个论据。从某种意义上说,它在 10 到 100 Mo 之间很大。它似乎对我的代码的性能有很大的影响。

我可能在这里做一些愚蠢且效率不高的事情,因为我的代码的性能确实因这个新参数而下降。

处理大字典的最佳方法是什么?我不想每次都在我的函数中加载它。它会是创建数据库并连接到它的解决方案吗?

这是您可以运行的代码:

'''
Created on Mar 11, 2013

@author: Antonin
'''

import multiprocessing
import random

# generate an artificially big dictionary
def generateBigDict():
    myBigDict = {}
    for key in range (0,1000000):
        myBigDict[key] = 1
    return myBigDict

def myMainFunction():
    # load the dictionary
    myBigDict = generateBigDict()
    # create a list on which we will asynchronously run the subfunction
    myList = []
    for list_element in range(0,20):
        myList.append(random.randrange(0,1000000))
    # an empty set to receive results
    set_of_results = set()
    # there is a for loop here on one of the arguments
    for loop_element in range(0,150):
        results = []
        # asynchronoulsy run the subfunction
        po = multiprocessing.Pool()
        for list_element in myList:
            results.append(po.apply_async(mySubFunction, (loop_element, list_element, myBigDict)))               
        po.close()
        po.join()
        for r in results:
            set_of_results.add(r.get())
    for element in set_of_results:
        print element

def mySubFunction(loop_element, list_element, myBigDict):
    import math
    intermediaryResult = myBigDict[list_element]
    finalResult = intermediaryResult + loop_element
    return math.log(finalResult)

if __name__ == '__main__':
    myMainFunction()
4

3 回答 3

3

我曾经multiprocessing.Manager这样做过。

import multiprocessing

manager = multiprocessing.Manager()
a_shared_big_dictionary = manager.dict(a_big_dictionary)

po = multiprocessing.Pool()
for elements in a_list:
    results.append(po.apply_async(my_module.my_function, (some_arguments, elements, a_shared_big_dictionary)))               
po.close()
po.join()
for r in results:
    a_new_list.add(r.get())

现在,它要快得多。

于 2013-03-13T21:04:36.317 回答
1

请参阅python 多处理问题中共享内存对象的答案。

它建议使用multiprocessing.Array将数组传递给子进程或使用 fork()。

于 2013-03-11T22:49:26.527 回答
1

您传递给其中一种Pool方法(例如apply_async)的任何参数都需要被腌制,通过管道发送到工作进程,并在工作进程中取消腌制。这个pickle/pass/unpickle过程在时间和内存上可能会很昂贵,特别是如果你有一个大的对象图,因为每个工作进程都必须创建一个单独的副本。

根据问题的具体形式,有许多不同的方法可以避免这些泡菜。由于您的工作人员只是阅读您的字典而不是写入它,因此您可以安全地直接从您的函数中引用它(即不将其传递给apply_async)并依靠fork()避免在工作进程中创建副本。

更好的是,您可以进行更改mySubFunction(),使其接受作为参数,而不是使用andintermediaryResult查找它。(您可以通过闭包来做到这一点,但我不能 100% 确定 pickle 也不会尝试复制封闭对象。)list_elementmyBigDictmyBigDict

或者,您可以将myBigDict它放在所有进程都可以安全共享它的地方,例如简单的持久性方法之一,例如 dbm 或 sqlite,并让工作人员从那里访问它。

不幸的是,所有这些解决方案都要求您更改任务功能的形状。避免这种“变形”是人们喜欢“真正的”cpu 线程的原因之一。

于 2013-03-11T23:02:24.190 回答