30

我想要类似的东西executor.map,除了当我迭代结果时,我想根据完成的顺序对它们进行迭代,例如首先完成的工作项应该首先出现在迭代中,等等。这样迭代就会如果序列中的每个工作项尚未完成,则阻止。

我知道如何自己使用队列来实现这一点,但我想知道是否可以使用该futures框架。

(我主要使用基于线程的执行器,所以我想要一个适用于这些的答案,但也欢迎一般性的答案。)

更新:感谢您的回答!你能解释一下我如何使用as_completedwithexecutor.map吗?executor.map是我使用期货时最有用和最简洁的工具,我不愿意Future手动开始使用对象。

4

3 回答 3

53

executor.map()和 builtin 一样map(),只按照可迭代的顺序返回结果,所以很遗憾你不能用它来确定完成的顺序。concurrent.futures.as_completed()是您正在寻找的 - 这是一个示例:

import time
import concurrent.futures

times = [3, 1, 2]

def sleeper(secs):
    time.sleep(secs)
    print('I slept for {} seconds'.format(secs))
    return secs

# returns in the order given
with concurrent.futures.ThreadPoolExecutor(max_workers=3) as executor:
    print(list(executor.map(sleeper, times)))

# I slept for 1 seconds
# I slept for 2 seconds
# I slept for 3 seconds
# [3, 1, 2]

# returns in the order completed
with concurrent.futures.ThreadPoolExecutor(max_workers=3) as executor:
    futs = [executor.submit(sleeper, secs) for secs in times]
    print([fut.result() for fut in concurrent.futures.as_completed(futs)])

# I slept for 1 seconds
# I slept for 2 seconds
# I slept for 3 seconds
# [1, 2, 3]

当然,如果您需要使用地图接口,您可以创建自己的map_as_completed()函数来封装上述内容(可能将其添加到子类中Executor()),但我认为通过创建期货实例executor.submit()是一种更简单/更清洁的方式(也允许您提供无参数,kwargs)。

于 2013-05-04T16:59:31.207 回答
1

并发期货根据完成时间返回一个迭代器——这听起来正是你要找的。

http://docs.python.org/dev/library/concurrent.futures.html#concurrent.futures.as_completed

如果您对实施有任何困惑或困难,请告诉我。

于 2013-05-02T04:42:56.287 回答
0

来自 python 文档

concurrent.futures.as_completed(fs, timeout=None)¶ 

返回由 fs 给出的 Future 实例(可能由不同的 Executor 实例创建)的迭代器,该迭代器在它们完成(完成或被取消)时产生期货。在调用 as_completed() 之前完成的任何期货都将首先产生。如果调用next () 并且在从原始调用 as_completed() 的超时秒数后结果不可用,则返回的迭代器会引发 TimeoutError 。timeout 可以是 int 或 float。如果未指定 timeout 或 None,则等待时间没有限制。

您需要了解executor.map()和之间的区别executor.submit()。第一个将函数映射到参数向量。它与 非常相似map,但异步启动任务。submit(func, arg)在每次调用时启动一项任务。在此任务中,func适用于arg.

这是一个可以在 python 3.0 上运行as_completed()的示例submit()

from concurrent import futures
import urllib.request

URLS = ['http://www.foxnews.com/',
        'http://www.cnn.com/',
        'http://europe.wsj.com/',
        'http://www.bbc.co.uk/',
        'http://some-made-up-domain.com/']

def load_url(url, timeout):
    return urllib.request.urlopen(url, timeout=timeout).read()

def main():
    with futures.ThreadPoolExecutor(max_workers=5) as executor:
        future_to_url = dict(
            (executor.submit(load_url, url, 60), url)
             for url in URLS)

        for future in futures.as_completed(future_to_url):
            url = future_to_url[future]
            try:
                print('%r page is %d bytes' % (
                          url, len(future.result())))
            except Exception as e:
                print('%r generated an exception: %s' % (
                          url, e))

if __name__ == '__main__':
    main()

此处使用no map(),任务运行时使用submitandas_completed()

返回 fs 给出的 Future 实例的迭代器,当它们完成(完成或被取消)时产生期货。

于 2013-05-03T09:56:36.303 回答