我想使用httpx从协程内的多个同时 HTTP 流请求中读取数据,并将数据返回给运行事件循环的非异步函数,而不仅仅是返回最终数据。
但是,如果我让我的异步函数产生而不是返回,我会收到抱怨,asyncio.as_completed()
并loop.run_until_complete()
期望协程或 Future,而不是异步生成器。
所以我可以让它工作的唯一方法是收集每个协程内的所有流数据,一旦请求完成就返回所有数据。然后收集所有协程结果,最后将其返回给非异步调用函数。
这意味着我必须将所有内容保存在内存中,并等到最慢的请求完成后才能获取所有数据,这违背了流式传输 http 请求的全部意义。
有什么办法可以完成这样的事情吗?我当前的愚蠢实现如下所示:
def collect_data(urls):
"""Non-async function wishing it was a non-async generator"""
async def stream(async_client, url, payload):
data = []
async with async_client.stream("GET", url=url) as ar:
ar.raise_for_status()
async for line in ar.aiter_lines():
data.append(line)
# would like to yield each line here
return data
async def execute_tasks(urls):
all_data = []
async with httpx.AsyncClient() as async_client:
tasks = [stream(async_client, url) for url in urls]
for coroutine in asyncio.as_completed(tasks):
all_data += await coroutine
# would like to iterate and yield each line here
return all_events
try:
loop = asyncio.get_event_loop()
data = loop.run_until_complete(execute_tasks(urls=urls))
return data
# would like to iterate and yield the data here as it becomes available
finally:
loop.close()
编辑:我也尝试了一些使用asyncio.Queue
和trio
内存通道的解决方案,但是由于我只能从异步范围内的那些中读取它并没有让我更接近解决方案
编辑 2:我想从非异步生成器中使用它的原因是我想从使用 Django Rest Framework 流 API 的 Django 应用程序中使用它。