93

我只是对我编写的一些代码感到非常困惑。我惊讶地发现:

with concurrent.futures.ThreadPoolExecutor(max_workers=4) as executor:
    results = list(executor.map(f, iterable))

with concurrent.futures.ThreadPoolExecutor(max_workers=4) as executor:
    results = list(map(lambda x: executor.submit(f, x), iterable))

产生不同的结果。第一个生成一个f返回任何类型的列表,第二个生成一个concurrent.futures.Future对象列表,然后需要使用它们的result()方法评估这些对象以获得f返回的值。

我主要担心的是这意味着executor.map不能利用concurrent.futures.as_completed,这似乎是一种非常方便的方法来评估我正在对数据库进行的一些长期运行调用的结果,因为它们变得可用。

我完全不清楚concurrent.futures.ThreadPoolExecutor对象是如何工作的——天真地,我更喜欢(有点冗长):

with concurrent.futures.ThreadPoolExecutor(max_workers=4) as executor:
    result_futures = list(map(lambda x: executor.submit(f, x), iterable))
    results = [f.result() for f in futures.as_completed(result_futures)]

更简洁executor.map,以便利用可能的性能增益。我这样做有错吗?

4

4 回答 4

51

问题是您将结果转换ThreadPoolExecutor.map为列表。如果您不这样做,而是直接迭代生成的生成器,结果仍会按原始顺序生成,但循环会在所有结果准备好之前继续。您可以使用此示例进行测试:

import time
import concurrent.futures

e = concurrent.futures.ThreadPoolExecutor(4)
s = range(10)
for i in e.map(time.sleep, s):
    print(i)

保留顺序的原因可能是因为有时以与您给它们映射的顺序相同的顺序获得结果很重要。并且结果可能不会包含在未来的对象中,因为在某些情况下,如果您需要它们,可能需要太长时间才能在列表上执行另一个映射以获取所有结果。毕竟在大多数情况下,下一个值很可能在循环处理第一个值之前就准备好了。这在这个例子中得到了证明:

import concurrent.futures

executor = concurrent.futures.ThreadPoolExecutor() # Or ProcessPoolExecutor
data = some_huge_list()
results = executor.map(crunch_number, data)
finals = []

for value in results:
    finals.append(do_some_stuff(value))

在此示例中,可能do_some_stuff需要更长的时间crunch_number,如果确实如此,那么在您仍然保持 map 的简单使用的同时,性能确实不会有很大损失。

此外,由于工作线程(/进程)从列表的开头开始处理并一直工作到您提交的列表的末尾,因此结果应该按照迭代器已经产生的顺序完成。这意味着在大多数情况下executor.map都很好,但在某些情况下,例如,如果您处理值的顺序无关紧要,并且您传递给的函数map需要非常不同的时间来运行,则future.as_completed可能会更快。

于 2013-12-30T23:03:58.403 回答
23

如果使用concurrent.futures.as_completed,则可以处理每个函数的异常。

import concurrent.futures
iterable = [1,2,3,4,6,7,8,9,10]

def f(x):
    if x == 2:
        raise Exception('x')
    return x

with concurrent.futures.ThreadPoolExecutor(max_workers=4) as executor:
    result_futures = list(map(lambda x: executor.submit(f, x), iterable))
    for future in concurrent.futures.as_completed(result_futures):
        try:
            print('resutl is', future.result())
        except Exception as e:
            print('e is', e, type(e))
# resutl is 3
# resutl is 1
# resutl is 4
# e is x <class 'Exception'>
# resutl is 6
# resutl is 7
# resutl is 8
# resutl is 9
# resutl is 10

在 中executor.map,如果有异常,整个执行器会停止。您需要在工作函数中处理异常。

with concurrent.futures.ThreadPoolExecutor(max_workers=4) as executor:
    for each in executor.map(f, iterable):
        print(each)
# if there is any exception, executor.map would stop
于 2020-01-08T02:37:24.557 回答
19

下面是一个.submit()vs的例子.map()。他们都立即接受工作(提交|映射 - 开始)。它们需要相同的时间来完成,11 秒(最后结果时间 - 开始)。但是,.submit()一旦ThreadPoolExecutor maxThreads=2完成中的任何线程(无序!),就会给出结果。虽然.map()按照提交的顺序给出结果。

import time
import concurrent.futures

def worker(i):
    time.sleep(i)
    return i,time.time()

e = concurrent.futures.ThreadPoolExecutor(2)
arrIn = range(1,7)[::-1]
print arrIn

f = []
print 'start submit',time.time()
for i in arrIn:
    f.append(e.submit(worker,i))
print 'submitted',time.time()
for r in concurrent.futures.as_completed(f):
    print r.result(),time.time()
print

f = []
print 'start map',time.time()
f = e.map(worker,arrIn)
print 'mapped',time.time()
for r in f:
    print r,time.time()    

输出:

[6, 5, 4, 3, 2, 1]
start submit 1543473934.47
submitted 1543473934.47
(5, 1543473939.473743) 1543473939.47
(6, 1543473940.471591) 1543473940.47
(3, 1543473943.473639) 1543473943.47
(4, 1543473943.474192) 1543473943.47
(1, 1543473944.474617) 1543473944.47
(2, 1543473945.477609) 1543473945.48

start map 1543473945.48
mapped 1543473945.48
(6, 1543473951.483908) 1543473951.48
(5, 1543473950.484109) 1543473951.48
(4, 1543473954.48858) 1543473954.49
(3, 1543473954.488384) 1543473954.49
(2, 1543473956.493789) 1543473956.49
(1, 1543473955.493888) 1543473956.49
于 2018-11-29T06:52:48.483 回答
18

除了此处答案中的解释之外,直接找到源代码可能会有所帮助。它重申了此处另一个答案的声明:


.map()在基类中定义concurrent.futures._base.Executor

class Executor(object):
    def submit(self, fn, *args, **kwargs):
        raise NotImplementedError()

    def map(self, fn, *iterables, timeout=None, chunksize=1):
        if timeout is not None:
            end_time = timeout + time.monotonic()

        fs = [self.submit(fn, *args) for args in zip(*iterables)]  # <!!!!!!!!

        def result_iterator():
            try:
                # reverse to keep finishing order
                fs.reverse()  # <!!!!!!!!
                while fs:
                    # Careful not to keep a reference to the popped future
                    if timeout is None:
                        yield fs.pop().result()  # <!!!!!!!!
                    else:
                        yield fs.pop().result(end_time - time.monotonic())
            finally:
                for future in fs:
                    future.cancel()
        return result_iterator()

正如您所提到的,还有.submit(), 需要在子类中定义,即ProcessPoolExecutorand ThreadPoolExecutor,并返回一个_base.Future您需要调用.result()以实际执行任何操作的实例。

重要的几行.map()归结为:

fs = [self.submit(fn, *args) for args in zip(*iterables)]
fs.reverse()
while fs:
    yield fs.pop().result()

.reverse().pop()是使第一个提交的结果(来自iterables)首先产生,第二个提交的结果第二个产生的方法,依此类推。结果迭代器的元素不是Futures;它们本身就是实际结果。

于 2018-12-22T04:39:30.883 回答