我有多个 api 路由,它们通过单独查询数据库来返回数据。
现在我正在尝试构建在 api 之上查询的仪表板。我应该如何将 api 调用放入队列中,以便它们异步执行?
我试过了
await queue.put({'response_1': await api_1(**kwargs), 'response_2': await api_2(**kwargs)})
似乎在将任务放入队列时返回了数据。
现在我正在使用
await queue.put(('response_1', api_1(**args_dict)))
在生产者和消费者中,我正在解析元组并进行 api 调用,我认为我做错了。
Question1 有没有更好的方法?
这是我用来创建任务的代码
producers = [create_task(producer(**args_dict, queue)) for row in stats]
consumers = [create_task(consumer(queue)) for row in stats]
await gather(*producers)
await queue.join()
for con in consumers:
con.cancel()
Question2我应该使用create_task 还是ensure_future?对不起,如果它是重复的,但我无法理解其中的区别,在网上搜索后我变得更加困惑。
我正在使用 FastAPI、数据库(异步)包。
我正在使用元组而不是字典,例如 await queue.put('response_1', api_1(**kwargs))
./app/dashboard.py:90: RuntimeWarning: coroutine 'api_1' was never awaited
item: Tuple = await queue.get_nowait()
我的消费者代码是
async def consumer(return_obj: dict, que: Queue):
item: Tuple = await queue.get_nowait()
print(f'consumer took {item[0]} from queue')
return_obj.update({f'{item[0]}': await item[1]})
await queue.task_done()
如果我不使用 get_nowait 消费者会卡住,因为队列可能是空的,但是如果我使用 get_nowait,则会显示上面的错误。我没有定义最大队列长度
- - - - - -编辑 - - - - - -
制片人
async def producer(queue: Queue, **kwargs):
await queue.put('response_1', api_1(**kwargs))