我第一次将 asyncio 与 httpx.AsyncClient 一起使用,并试图弄清楚当其中一些任务可能失败时如何完成我的任务列表。我正在使用我在几个地方找到的模式,在这些地方我用协程函数填充了一个 asyncio 队列,并有一组从 asyncio.gather 内部排队的工作进程。通常,如果执行工作的函数引发异常,您将看到整个脚本在该处理期间失败,并报告异常以及 a RuntimeWarning: coroutine foo was never awaited
,表明您从未完成您的列表。
我找到了return_exceptions
asyncio.gather 的选项,这有所帮助,但并不完全。在我得到异常的次数与我在调用gather
. 以下是演示该问题的简单脚本。
from httpx import AsyncClient, Timeout
from asyncio import run, gather, Queue as asyncio_Queue
from random import choice
async def process_url(client, url):
"""
opens the URL and pulls a header attribute
randomly raises an exception to demonstrate my problem
"""
if choice([True, False]):
await client.get(url)
print(f'retrieved url {url}')
else:
raise AssertionError(f'generated error for url {url}')
async def main(worker_count, urls):
"""
orchestrates the workers that call process_url
"""
httpx_timeout = Timeout(10.0, read=20.0)
async with AsyncClient(timeout=httpx_timeout, follow_redirects=True) as client:
tasks = asyncio_Queue(maxsize=0)
for url in urls:
await tasks.put(process_url(client, url))
async def worker():
while not tasks.empty():
await tasks.get_nowait()
results = await gather(*[worker() for _ in range(worker_count)], return_exceptions=True)
return results
if __name__ == '__main__':
urls = ['https://stackoverflow.com/questions',
'https://stackoverflow.com/jobs',
'https://stackoverflow.com/tags',
'https://stackoverflow.com/users',
'https://www.google.com/',
'https://www.bing.com/',
'https://www.yahoo.com/',
'https://www.foxnews.com/',
'https://www.cnn.com/',
'https://www.npr.org/',
'https://www.opera.com/',
'https://www.mozilla.org/en-US/firefox/',
'https://www.google.com/chrome/',
'https://www.epicbrowser.com/'
]
print(f'processing {len(urls)} urls')
run_results = run(main(4, urls))
print('\n'.join([str(rr) for rr in run_results]))
此脚本的一次运行输出:
processing 14 urls
retrieved url https://stackoverflow.com/tags
retrieved url https://stackoverflow.com/jobs
retrieved url https://stackoverflow.com/users
retrieved url https://www.bing.com/
generated error for url https://stackoverflow.com/questions
generated error for url https://www.foxnews.com/
generated error for url https://www.google.com/
generated error for url https://www.yahoo.com/
sys:1: RuntimeWarning: coroutine 'process_url' was never awaited
Process finished with exit code 0
在这里,您看到我们通过了总共 14 个 url 中的 8 个,但是当我们遇到 4 个错误时,脚本结束并忽略了其余的 url。
我想要做的是让脚本完成完整的 url 集,但最后告诉我错误。有没有办法在这里做到这一点?可能是我必须将所有内容包装在process_url()
一个try/except
块中并使用 aiofile 之类的东西最终将它们转储出来?
更新 需要明确的是,这个演示脚本是对我真正在做的事情的简化。我的真实脚本是几十万次访问少量服务器 api 端点。使用一组工作人员的目的是避免压倒我正在访问的服务器[它是测试服务器,而不是生产服务器,因此它不打算处理大量请求,尽管数量大于 4 8-)]。我愿意学习替代品。