0

我有多个 api 路由,它们通过单独查询数据库来返回数据。

现在我正在尝试构建在 api 之上查询的仪表板。我应该如何将 api 调用放入队列中,以便它们异步执行?

我试过了

await queue.put({'response_1': await api_1(**kwargs), 'response_2': await api_2(**kwargs)})

似乎在将任务放入队列时返回了数据。

现在我正在使用

await queue.put(('response_1', api_1(**args_dict)))

在生产者和消费者中,我正在解析元组并进行 api 调用,我认为我做错了。

Question1 有没有更好的方法?

这是我用来创建任务的代码

producers = [create_task(producer(**args_dict, queue)) for row in stats]
consumers = [create_task(consumer(queue)) for row in stats]
await gather(*producers)
await queue.join()
for con in consumers:
    con.cancel()

Question2我应该使用create_task 还是ensure_future?对不起,如果它是重复的,但我无法理解其中的区别,在网上搜索后我变得更加困惑。

我正在使用 FastAPI、数据库(异步)包。

我正在使用元组而不是字典,例如 await queue.put('response_1', api_1(**kwargs))

./app/dashboard.py:90: RuntimeWarning: coroutine 'api_1' was never awaited
item: Tuple = await queue.get_nowait()

我的消费者代码是

async def consumer(return_obj: dict, que: Queue):
    item: Tuple = await queue.get_nowait()
    print(f'consumer took {item[0]} from queue')
    return_obj.update({f'{item[0]}': await item[1]})
    await queue.task_done()

如果我不使用 get_nowait 消费者会卡住,因为队列可能是空的,但是如果我使用 get_nowait,则会显示上面的错误。我没有定义最大队列长度

- - - - - -编辑 - - - - - -

制片人

async def producer(queue: Queue, **kwargs):
    await queue.put('response_1', api_1(**kwargs))
4

1 回答 1

1

await您可以从第一个片段中删除并在队列中发送协程对象。协程对象是被调用但尚未等待的协程。

# producer:
await queue.put({'response_1': api_1(**kwargs),
                 'response_2': api_2(**kwargs)})
...

# consumer:
while True:
    dct = await queue.get()
    for name, api_coro in dct:
        result = await api_coro
        print('result of', name, ':', result)

我应该使用create_taskorensure_future吗?

如果参数是调用协程函数的结果,则应使用create_task(请参阅Guido 的此评论以获取解释)。顾名思义,它将返回一个Task驱动该协程的实例。该任务也可以等待,但它会继续在后台运行。

ensure_future是一个更专业的函数,它将各种等待对象转换为它们相应的未来。它在实现诸如asyncio.gather()为了方便而接受不同类型的等待对象的功能时很有用,并且需要在使用它们之前将它们转换为期货。

于 2020-01-19T23:19:15.187 回答