1

我有一个接受列表的程序。对于这个列表中的每个值,它检索另一个列表并处理这个另一个列表。

基本上,它是一个 3 深度树,需要在每个节点上进行可能昂贵的处理。

每个节点都需要能够处理其子节点的结果。

我想做的是map从第一层的输入list到每个节点的结果。不过,在每一个过程中,我都希望map得到下一层的结果。

我担心的是每一层都有自己的最大工人数量。如果可能,我希望他们共享一个进程池,否则所有进程切换都会影响性能。

有没有办法,使用concurrency.futures或其他方法,让每一层共享同一个进程池?

一个例子是:

def main():
    my_list = [1,2,3,4]
    with concurrent.futures.ProcessPoolExecutor(max_workers = 4) as executor:
        results = executor.map(my_function, zip(my_list, [executor] * len(my_list)))
        #process results

def my_function(args):
    list = args[0]
    executor = args[1]
    new_list = process(list)
    results = executor.map(second_function, new_list)
    #process results
    #return processed results

def second_function(values):
    ...

这样,每个子进程都会从同一个池中抽取。

或者,我可以做类似的事情(但不完全是)

import concurrent.futures.ProcessPoolExecutor(max_workers = 4) as executor

并且每次调用都executor从同一个进程池中提取?

4

1 回答 1

1

问题是您的进程池有 4 个线程,并且您尝试等待 20 个线程。所以没有足够的线程来执行您想要的操作。

换句话说:my_function在工作线程中执行。调用 map 时,此线程会阻塞。执行映射调用的线程少了一个。期货阻塞了这个线程。

我的解决方案是使用返回期货的yieldandyield from语句。所以我的解决方案是消除期货和线程的阻塞。所有期货都被创建,然后产生一个中断以中断当前执行并释放线程。然后该线程可以执行地图期货。一旦未来完成,注册的 callbac 就会执行next()生成器步骤。

要解决现有对象的代理问题,必须首先解决这个问题:如何为已存在的对象正确设置多处理代理对象

因此,假设我们要执行以下递归:[1,[2,[3,3,3],2],1],0,0]列表的递归并行求和。

我们可以期待以下输出:

tasks: [[1, [2, [3, 3, 3], 2], 1], 0, 0]
tasks: [1, [2, [3, 3, 3], 2], 1]
tasks: 0
tasks: 0
tasks: 1
tasks: [2, [3, 3, 3], 2]
tasks: 1
tasks: 2
tasks: [3, 3, 3]
tasks: 2
tasks: 3
tasks: 3
tasks: 3
v: 15

这里的代码引入了一个启用递归的 ThreadPoolExecutor:

import traceback
from concurrent.futures.thread import *
from concurrent.futures import *
from concurrent.futures._base import *
##import hanging_threads

class RecursiveThreadPoolExecutor(ThreadPoolExecutor):

    # updated version here: https://gist.github.com/niccokunzmann/9170072

    def _submit(self, fn, *args, **kwargs):
        return ThreadPoolExecutor.submit(self, fn, *args, **kwargs)

    def submit(self, fn, *args, **kwargs):
        """Submits a callable to be executed with the given arguments.

        Schedules the callable to be executed as fn(*args, **kwargs) and returns
        a Future instance representing the execution of the callable.

        Returns:
            A Future representing the given call.
        """
        real_future = Future()
        def generator_start():
            try:
##                print('start', fn, args, kwargs)
                generator = fn(*args, **kwargs)
##                print('generator:', generator)
                def generator_next():
                    try:
##                        print('next')
                        try:
                            future = next(generator)
                        except StopIteration as stop:
                            real_future.set_result(stop.args[0])
                        else:
                            if future is None:
                                self._submit(generator_next)
                            else:
                                future.add_done_callback(lambda future: generator_next())
                    except:
                        traceback.print_exc()
                self._submit(generator_next)
##                print('next submitted 1')
            except:
                traceback.print_exc()
        self._submit(generator_start)
        return real_future

    def recursive_map(self, fn, *iterables, timeout=None):
        """Returns a iterator equivalent to map(fn, iter).

        Args:
            fn: A callable that will take as many arguments as there are
                passed iterables.
            timeout: The maximum number of seconds to wait. If None, then there
                is no limit on the wait time.

        Returns:
            An iterator equivalent to: map(func, *iterables) but the calls may
            be evaluated out-of-order.

        Raises:
            TimeoutError: If the entire result iterator could not be generated
                before the given timeout.
            Exception: If fn(*args) raises for any values.
        """
        if timeout is not None:
            end_time = timeout + time.time()

        fs = [self.submit(fn, *args) for args in zip(*iterables)]

        # Yield must be hidden in closure so that the futures are submitted
        # before the first iterator value is required.
        def result_iterator():
            yield from fs
            return fs
        return result_iterator()

if __name__ == '__main__':

    def f(args):
        executor, tasks = args
        print ('tasks:', tasks)
        if type(tasks) == int:
            return tasks
        # waiting for all futures without blocking the thread
        futures = yield from executor.recursive_map(f, [(executor, task) for task in tasks]) 
        return sum([future.result() for future in futures])

    with RecursiveThreadPoolExecutor(max_workers = 1) as executor:
        r = executor.map(f, [(executor, [[1,[2,[3,3,3],2],1],0,0],)] * 1)
        import time
        time.sleep(0.1)

        for v in r:
            print('v: {}'.format(v))

可以在此处找到更新版本:https ://gist.github.com/niccokunzmann/9170072

可悲的是,我现在无法使用一些多处理的东西为进程实现这一点。您可以做到这一点,唯一需要做的就是为generator_startandgenerator_next函数创建一个代理对象。如果你这样做,请告诉我。

为了解决方法的代理问题,也将回答这个问题:How to proper setup multiprocessing proxy objects for objects that already exists

于 2014-02-23T10:26:58.783 回答