2

我第一次将 asyncio 与 httpx.AsyncClient 一起使用,并试图弄清楚当其中一些任务可能失败时如何完成我的任务列表。我正在使用我在几个地方找到的模式,在这些地方我用协程函数填充了一个 asyncio 队列,并有一组从 asyncio.gather 内部排队的工作进程。通常,如果执行工作的函数引发异常,您将看到整个脚本在该处理期间失败,并报告异常以及 a RuntimeWarning: coroutine foo was never awaited,表明您从未完成您的列表。

我找到了return_exceptionsasyncio.gather 的选项,这有所帮助,但并不完全。在我得到异常的次数与我在调用gather. 以下是演示该问题的简单脚本。

from httpx import AsyncClient, Timeout
from asyncio import run, gather, Queue as asyncio_Queue
from random import choice


async def process_url(client, url):
    """
    opens the URL and pulls a header attribute
    randomly raises an exception to demonstrate my problem
    """
    if choice([True, False]):
        await client.get(url)
        print(f'retrieved url {url}')
    else:
        raise AssertionError(f'generated error for url {url}')


async def main(worker_count, urls):
    """
    orchestrates the workers that call process_url
    """
    httpx_timeout = Timeout(10.0, read=20.0)
    async with AsyncClient(timeout=httpx_timeout, follow_redirects=True) as client:
        tasks = asyncio_Queue(maxsize=0)
        for url in urls:
            await tasks.put(process_url(client, url))

        async def worker():
            while not tasks.empty():
                await tasks.get_nowait()

        results = await gather(*[worker() for _ in range(worker_count)], return_exceptions=True)
        return results

if __name__ == '__main__':
    urls = ['https://stackoverflow.com/questions',
            'https://stackoverflow.com/jobs',
            'https://stackoverflow.com/tags',
            'https://stackoverflow.com/users',
            'https://www.google.com/',
            'https://www.bing.com/',
            'https://www.yahoo.com/',
            'https://www.foxnews.com/',
            'https://www.cnn.com/',
            'https://www.npr.org/',
            'https://www.opera.com/',
            'https://www.mozilla.org/en-US/firefox/',
            'https://www.google.com/chrome/',
            'https://www.epicbrowser.com/'
            ]
    print(f'processing {len(urls)} urls')
    run_results = run(main(4, urls))
    print('\n'.join([str(rr) for rr in run_results]))

此脚本的一次运行输出:

processing 14 urls
retrieved url https://stackoverflow.com/tags
retrieved url https://stackoverflow.com/jobs
retrieved url https://stackoverflow.com/users
retrieved url https://www.bing.com/
generated error for url https://stackoverflow.com/questions
generated error for url https://www.foxnews.com/
generated error for url https://www.google.com/
generated error for url https://www.yahoo.com/
sys:1: RuntimeWarning: coroutine 'process_url' was never awaited

Process finished with exit code 0

在这里,您看到我们通过了总共 14 个 url 中的 8 个,但是当我们遇到 4 个错误时,脚本结束并忽略了其余的 url。

我想要做的是让脚本完成完整的 url 集,但最后告诉我错误。有没有办法在这里做到这一点?可能是我必须将所有内容包装在process_url()一个try/except块中并使用 aiofile 之类的东西最终将它们转储出来?

更新 需要明确的是,这个演示脚本是对我真正在做的事情的简化。我的真实脚本是几十万次访问少量服务器 api 端点。使用一组工作人员的目的是避免压倒我正在访问的服务器[它是测试服务器,而不是生产服务器,因此它不打算处理大量请求,尽管数量大于 4 8-)]。我愿意学习替代品。

4

1 回答 1

0

您概述的程序设计应该可以正常工作,但您必须防止任务(worker函数的实例)崩溃。下面的清单显示了一种方法。

您的 Queue 名为“tasks”,但您放入其中的项目不是任务 - 它们是coroutines。就目前而言,您的程序有五个任务:其中一个是main函数,它由 asyncio.run() 制成一个任务。其他四个任务是 的实例worker,由 asyncio.gather 制成任务。

worker在协程上等待并且该协程崩溃时,异常会传播到workerawait 语句中。因为异常没有处理,worker会依次崩溃。为防止这种情况,请执行以下操作:

async def worker():
    while not tasks.empty():
        try:
            await tasks.get_nowait()
        except Exception:
            pass
            # You might want to do something more intelligent here
            # (logging, perhaps), rather than simply suppressing the exception

这应该允许您的示例程序运行完成。

于 2021-12-31T23:09:51.350 回答