82

接受可变数量的concurrent.futures.Executor.map迭代,从中调用给定的函数。如果我有一个生成元组的生成器通常在原地解压,我应该怎么称呼它?

以下内容不起作用,因为每个生成的元组都作为 map 的不同参数给出:

args = ((a, b) for (a, b) in c)
for result in executor.map(f, *args):
    pass

如果没有生成器,映射所需的参数可能如下所示:

executor.map(
    f,
    (i[0] for i in args),
    (i[1] for i in args),
    ...,
    (i[N] for i in args),
)
4

9 回答 9

75

一个论点被重复,一个论点在c

from itertools import repeat
for result in executor.map(f, repeat(a), c):
    pass

需要拆包的物品c,可以拆包c

from itertools import izip
for result in executor.map(f, *izip(*c)):
    pass

需要拆包的物品c,无法拆包c

  1. 更改f为采用单个参数并解压缩函数中的参数。
  2. 如果其中的每个项目c都有可变数量的成员,或者您f只调用了几次:

    executor.map(lambda args, f=f: f(*args), c)
    

    它定义了一个新函数,用于解包每个项目c并调用f. 使用默认参数 for fin the lambdamake flocal inside the lambda,因此减少了查找时间。

  3. 如果您有固定数量的参数,并且需要f多次调用:

    from collections import deque
    def itemtee(iterable, n=2):
        def gen(it = iter(iterable), items = deque(), next = next):
            popleft = items.popleft
            extend = items.extend
            while True:
                if not items:
                    extend(next(it))
                yield popleft()
        return [gen()] * n
    
    executor.map(f, *itemtee(c, n))
    

n的参数数量在哪里f。这是改编自itertools.tee.

于 2011-08-08T00:25:10.483 回答
68

您需要*map通话中删除:

args = ((a, b) for b in c)
for result in executor.map(f, args):
    pass

这将调用f,len(args)次, wheref应该接受一个参数。

如果要f接受两个参数,可以使用 lambda 调用,例如:

args = ((a, b) for b in c)
for result in executor.map(lambda p: f(*p), args):   # (*p) does the unpacking part
    pass
于 2011-08-08T01:08:52.123 回答
21

您可以使用 currying 通过Python 中的部分方法创建新函数

from concurrent.futures import ThreadPoolExecutor
from functools import partial


def some_func(param1, param2):
    # some code

# currying some_func with 'a' argument is repeated
func = partial(some_func, a)
with ThreadPoolExecutor() as executor:
    executor.map(func, list_of_args):
    ...

如果您需要传递多个相同的参数,您可以将它们传递给部分方法

func = partial(some_func, a, b, c)
于 2018-10-26T20:05:24.853 回答
12

因此,假设您有一个带有 3 个参数的函数,并且所有 3 个参数都是动态的,并且随着每次调用而不断变化。例如:

def multiply(a,b,c):
    print(a * b * c)

要使用线程多次调用它,我将首先创建一个元组列表,其中每个元组是 a、b、c 的一个版本:

arguments = [(1,2,3), (4,5,6), (7,8,9), ....]

我们知道concurrent.futures'map函数将接受第一个参数作为目标函数,第二个参数作为将要执行的每个版本的函数的参数列表。因此,您可能会像这样拨打电话:

for _ in executor.map(multiply, arguments) # Error

但这会给你函数预期的错误3 arguments but got only 1。为了解决这个问题,我们创建了一个辅助函数:

def helper(numbers):
    multiply(numbers[0], numbers[1], numbers[2])

现在,我们可以使用 executor 调用这个函数,如下所示:

with ThreadPoolExecutor() as executor:
     for _ in executor.map(helper, arguments):
         pass

这应该会给你想要的结果。

于 2019-12-05T10:58:48.670 回答
4

这是一个代码片段,展示了如何使用 ThreadPoolExecutor 向函数发送多个参数:

import concurrent.futures


def hello(first_name: str, last_name: str) -> None:
    """Prints a friendly hello with first name and last name"""
    print('Hello %s %s!' % (first_name, last_name))


def main() -> None:
    """Examples showing how to use ThreadPoolExecutor and executer.map
    sending multiple arguments to a function"""

    # Example 1: Sending multiple arguments using tuples
    # Define tuples with sequential arguments to be passed to hello()
    args_names = (
        ('Bruce', 'Wayne'),
        ('Clark', 'Kent'),
        ('Diana', 'Prince'),
        ('Barry', 'Allen'),
    )
    with concurrent.futures.ThreadPoolExecutor() as executor:
        # Using lambda, unpacks the tuple (*f) into hello(*args)
        executor.map(lambda f: hello(*f), args_names)

    print()

    # Example 2: Sending multiple arguments using dict with named keys
    # Define dicts with arguments as key names to be passed to hello()
    kwargs_names = (
        {'first_name': 'Bruce', 'last_name': 'Wayne'},
        {'first_name': 'Clark', 'last_name': 'Kent'},
        {'first_name': 'Diana', 'last_name': 'Prince'},
        {'first_name': 'Barry', 'last_name': 'Allen'},
    )
    with concurrent.futures.ThreadPoolExecutor() as executor:
        # Using lambda, unpacks the dict (**f) into hello(**kwargs)
        executor.map(lambda f: hello(**f), kwargs_names)


if __name__ == '__main__':
    main()
于 2021-04-30T22:54:56.077 回答
2

对于ProcessPoolExecutor.map()

类似于 map(func, *iterables) 除了:

iterables 被立即收集而不是懒惰地收集;

func 是异步执行的,并且可以同时对 func 进行多次调用。

尝试在python 3下运行以下代码段,你会很清楚:

from concurrent.futures import ProcessPoolExecutor

def f(a, b):
    print(a+b)

with ProcessPoolExecutor() as pool:
    pool.map(f, (0, 1, 2, 3, 4, 5, 6, 7, 8, 9), (0, 1, 2))

# 0, 2, 4

array = [(i, i) for i in range(3)]
with ProcessPoolExecutor() as pool:
    pool.map(f, *zip(*array))

# 0, 2, 4
于 2019-05-30T09:43:18.807 回答
1

我在这里看到了很多答案,但没有一个比使用 lambda 表达式更直接:

foo(x,y):通过

想用相同的值调用上述方法 10 次,即 xVal 和 yVal?以 concurrent.futures.ThreadPoolExecutor() 作为执行者:

for _ in executor.map( lambda _: foo(xVal, yVal), range(0, 10)):
    pass
于 2019-11-07T19:31:35.250 回答
1

假设您在下面显示的数据框中有这样的数据,并且您想将第一两列传递给一个函数,该函数将读取图像并预测特征,然后计算差异并返回差异值。

注意:您可以根据您的要求拥有任何场景,并且您可以分别定义功能。

下面的代码片段将这两列作为参数并传递给线程池机制(也显示进度条)

在此处输入图像描述

''' function that will give the difference of two numpy feature matrix'''
def getDifference(image_1_loc, image_2_loc, esp=1e-7):
       arr1 = ''' read 1st image and extract feature '''
       arr2 = ''' read 2nd image and extract feature '''
       diff = arr1.ravel() - arr2.ravel() + esp    
       return diff

'''Using ThreadPoolExecutor from concurrent.futures with multiple argument'''

with ThreadPoolExecutor() as executor:
        result = np.array(
                         list(tqdm(
                                   executor.map(lambda x : function(*x), [(i,j) for i,j in df[['image_1','image_2']].values]),
                               total=len(df)
                                  ) 
                             )
                          )

在此处输入图像描述

于 2021-06-29T09:53:26.470 回答
-1

下面是我一直使用的一个简单实用程序。

########### Start of Utility Code ###########

import os
import sys
import traceback

from concurrent import futures
from functools import partial


def catch(fn):
    def wrap(*args, **kwargs):
        result = None
        try:
            result = fn(*args, **kwargs)
        except Exception as err:
            type_, value_, traceback_ = sys.exc_info()
            return None, (
                args,
                "".join(traceback.format_exception(type_, value_, traceback_)),
            )
        else:
            return result, (args, None)

    return wrap


def top_level_wrap(fn, arg_tuple):
    args, kwargs = arg_tuple
    return fn(*args, *kwargs)


def create_processes(fn, values, handle_error, handle_success):
    cores = os.cpu_count()
    max_workers = 2 * cores + 1

    to_exec = partial(top_level_wrap, fn)

    with futures.ProcessPoolExecutor(max_workers=max_workers) as executor:
        for result, error in executor.map(to_exec, values):
            args, tb = error
            if tb is not None:
                handle_error(args, tb)
            else:
                handle_success(result)


########### End of Utility Code ###########

示例用法 -

######### Start of example usage ###########

import time


@catch
def fail_when_5(val):
    time.sleep(val)
    if val == 5:
        raise Exception("Error - val was 5")
    else:
        return f"No error val is {val}"


def handle_error(args, tb):
    print("args is", args)
    print("TB is", tb)


def top_level(val, val_2, test=None, test2="ok"):
    print(val_2, test, test2)
    return fail_when_5(val)

handle_success = print


if __name__ == "__main__":
    # SHAPE -> ( (args, kwargs), (args, kwargs), ... )
    values = tuple(
        ((x, x + 1), {"test": f"t_{x+2}", "test2": f"t_{x+3}"}) for x in range(10)
    )
    create_processes(top_level, values, handle_error, handle_success)

######### End of example usage ###########
于 2022-01-29T01:15:30.167 回答