3

A quick question about parallel processing in Python. Lets say I have a big shared data structure and want to apply many functions on it in parallel. These functions are read only on the data structure but perform mutation in a result object:

def compute_heavy_task(self):
    big_shared_object = self.big_shared_object
    result_refs = self.result_refs
    for ref in result_refs:
         some_expensive_task(ref, big_shared_object)

How do I do these in parallel, say 5 at a time, or 10 at a time. How how about number of processors at a time?

4

1 回答 1

4

你不能用 Python 中的线程有效地做到这一点(至少不是你可能正在使用的 CPython 实现)。Global Interpreter Lock 意味着,您想要的 8 个内核的效率接近 800%,而您只能获得 90%。

但是您可以使用单独的进程来执行此操作。标准库中内置了两个选项:concurrent.futuresmultiprocessing. 一般来说,futures在简单的情况下更简单,而且通常更容易编写;multiprocessing总体来说更加灵活和强大。futures也只附带 Python 3.2 或更高版本,但PyPI 有 2.5-3.1 的反向端口

您希望获得灵活性的一种情况multiprocessing是您拥有大型共享数据结构。有关详细信息,请参阅在进程之间共享状态以及直接在上面、下面和从中链接的部分。

如果你的数据结构真的很简单,就像一个巨大的整数数组,这很简单:

class MyClass(object):
    def __init__(self, giant_iterator_of_ints):
        self.big_shared_object = multiprocessing.Array('i', giant_iterator_of_ints)
    def compute_heavy_task(self):
        lock = multiprocessing.Lock()
        def subtask(my_range):
            return some_expensive_task(self.big_shared_object, lock, my_range)
        pool = multiprocessing.pool.Pool(5)
        my_ranges = split_into_chunks_appropriately(len(self.big_shared_object)
        results = pool.map_async(subtask, my_ranges)
        pool.close()
        pool.join()

请注意,该some_expensive_task函数现在需要一个锁对象——它必须确保在每次访问共享对象时都获得锁(或者,更常见的是,每个“事务”由一个或多个访问组成)。锁定规则可能很棘手,但如果您想使用直接数据共享,则确实没有办法解决。

另请注意,它需要一个my_range. 如果你只是在同一个对象上调用同一个函数 5 次,它会做同样的事情 5 次,这可能不是很有用。并行化事物的一种常见方法是为每个任务分配整个数据集的子范围。(除了描述起来通常很简单之外,如果你对这一点很小心,使用正确的算法,你甚至可以通过这种方式避免很多锁定。)

如果你想将一堆不同的函数映射到同一个数据集,你显然需要一些函数集合来处理,而不是some_expensive_task重复使用。然后,您可以,例如,迭代这些调用apply_async每个函数的函数。但是你也可以反过来:编写一个应用程序函数,作为数据的闭包,它接受一个函数并将其应用于数据。然后,只是map函数集合之上的那个函数。

我还假设您的数据结构是您可以使用multiprocessing.Array. 如果没有,您将不得不以 C 风格设计数据结构,将其实现为 a ctypes Arrayof Structures 或反之亦然,然后使用这些multiprocessing.sharedctypes东西。

我还将结果对象移动到刚刚传回的结果中。如果它们也很大并且需要共享,请使用相同的技巧使它们可共享。


在进一步讨论之前,您应该问自己是否真的需要共享数据。以这种方式做事,您将花费 80% 的调试、性能调整等时间来添加和删除锁,使它们或多或少地细化等等。如果您可以避免传递不可变的数据结构,或处理文件、数据库或几乎任何其他替代方案,这 80% 可以用于您的其余代码。

于 2013-04-26T22:59:10.853 回答