4

我有一个要通过 concurrent.futures 的 ThreadPoolExecutor 下载的 url 列表,但可能有一些超时 url,我想在所有第一次尝试结束后重新下载它们。我不知道该怎么做,这是我的尝试,但因无休止的打印“time_out_again”而失败:

import concurrent.futures

def player_url(url):
    # here. if timeout, return 1. otherwise do I/O and return 0.
    ...

urls = [...]
time_out_futures = [] #list to accumulate timeout urls
with concurrent.futures.ThreadPoolExecutor(max_workers=10) as executor:
    future_to_url = (executor.submit(player_url, url) for url in urls)
    for future in concurrent.futures.as_completed(future_to_url):
        if future.result() == 1:
            time_out_futures.append(future)

# here is what I try to deal with all the timeout urls       
while time_out_futures:
    future = time_out_futures.pop()
    if future.result() == 1:
        print('time_out_again')
        time_out_futures.insert(0,future)   # add back to the list

那么,有没有办法解决这个问题呢?

4

1 回答 1

2

Future对象只能使用一次。本身对Future返回结果的函数一无所知 -ThreadPoolExecutor对象负责创建Future、返回并在后台运行函数:

def submit(self, fn, *args, **kwargs):
    with self._shutdown_lock:
        if self._shutdown:
            raise RuntimeError('cannot schedule new futures after shutdown')

        f = _base.Future()
        w = _WorkItem(f, fn, args, kwargs)

        self._work_queue.put(w)
        self._adjust_thread_count()
        return f

class _WorkItem(object):
    def __init__(self, future, fn, args, kwargs):
        self.future = future
        self.fn = fn
        self.args = args
        self.kwargs = kwargs

    def run(self):
        if not self.future.set_running_or_notify_cancel():
            return

        try:
            result = self.fn(*self.args, **self.kwargs)  # sefl.fn is play_url in your case
        except BaseException as e:
            self.future.set_exception(e)
        else:
            self.future.set_result(result)  # The result is set on the Future

正如你所看到的,当函数完成时,结果被设置在Future对象上。因为对象实际上对提供Future结果的函数一无所知,所以无法尝试使用该对象重新运行该函数。您所能做的就是在超时发生时返回,然后将 url 重新发送到:Futureurl1submitThreadPoolExecutor

def player_url(url):
    # here. if timeout, return 1. otherwise do I/O and return 0.
    ...
    if timeout:
        return (1, url)
    else:
        return (0, url)

urls = [...]
with concurrent.futures.ThreadPoolExecutor(max_workers=10) as executor:
    while urls:
        future_to_url = executor.map(player_url, urls)
        urls = []  # Clear urls list, we'll re-add any timed out operations.
        for future in future_to_url:
            if future.result()[0] == 1:
                urls.append(future.result()[1]) # stick url into list
于 2014-08-20T17:30:59.083 回答