0

是否可以将 asyncio.Queue 与Quart之类的网络服务器一起使用来在生产者和消费者之间进行通信?

这是我想要做的......

from quart import Quart, request
import asyncio

queue = asyncio.Queue()
producers = []
consumers = []


async def producer(mesg):
    print(f'produced {mesg}')
    await queue.put(mesg)
    await asyncio.sleep(1) # do some work


async def consumer():
    while True:
        token = await queue.get()
        await asyncio.sleep(1) # do some work
        queue.task_done()
        print(f'consumed {token}')


@app.route('/route', methods=['POST'])
async def index():
    mesg = await request.get_data()
    try:
        p = asyncio.create_task(producer(mesg))
        producers.append(p)
        c = asyncio.create_task(consumer())
        consumers.append(c)
        return f"published message {mesg}", 200
    except Exception as e:
        logger.exception("Failed tp publish message %s!", mesg)
        return f"Failed to publish message: {mesg}", 400

if __name__ == '__main__':
    PORT = int(os.getenv('PORT')) if os.getenv('PORT') else 8050
    app.run(host='0.0.0.0', port=PORT, debug=True)

这工作正常。但我不确定这是否是一个好习惯,因为我很困惑如何(在我的代码中)执行以下步骤。

# Making sure all the producers have completed
await asyncio.gather(*producers)

#wait for the remaining tasks to be processed
await queue.join()

# cancel the consumers, which are now idle
for c in consumers:
    c.cancel()

编辑-1:

我尝试使用@app.after_serving, 和一些logger.debug语句。

@app.after_serving
async def shutdown():
    logger.debug("Shutting down...")
    logger.debug("waiting for producers to finish...")
    await asyncio.gather(*producers)
    logger.debug("waiting for tasks to complete...")
    await queue.join()
    logger.debug("cancelling consumers...")
    for c in consumers:
        c.cancel()

hypercorn但是在正常关闭时不会打印调试语句。所以,我不确定在关闭@app.after_serving期间是否实际调用了装饰的函数(关闭)。

这是hypercorn关机期间的消息

appserver_1  | 2020-05-29 15:55:14,200 - base_events.py:1490 -        create_server - INFO - <Server sockets=(<asyncio.TransportSocket fd=14, family=AddressFamily.AF_INET, type=SocketKind.SOCK_STREAM, proto=0, laddr=('0.0.0.0', 8080)>,)> is serving
appserver_1  | Running on 0.0.0.0:8080 over http (CTRL + C to quit)
Gracefully stopping... (press Ctrl+C again to force)

我使用 akill -SIGTERM <PID>来表示进程正常关闭。

4

2 回答 2

0

我会将清理代码放在关闭after_serving函数中,

@app.after_serving
async def shutdown():
    # Making sure all the producers have completed
    await asyncio.gather(*producers)

    #wait for the remaining tasks to be processed
    await queue.join()

    # cancel the consumers, which are now idle
    for c in consumers:
        c.cancel()

至于全局变量,我倾向于将它们直接存储在应用程序中,以便可以通过current_app代理访问它们。请注意,尽管这(和您的解决方案)仅适用于单个进程(工作人员),但如果您想使用多个工作人员(或等效的主机),您将需要第三方存储来存储此信息,例如使用 redis。

于 2020-05-29T12:54:15.107 回答
0

但我不确定这是否是一个好习惯

像您在示例中创建的全局变量通常不是企业解决方案中的良好实践。尤其是在 Python 中,当涉及到全局变量时,会有一些挑剔的行为。根据我的经验,将变量传递给函数或类是一种更简洁的方法。但是,我不知道如何在夸特中做到这一点,因为我不使用那个库。

# Making sure all the producers have completed
#wait for the remaining tasks to be processed
# cancel the consumers, which are now idle

通常在退出事件循环和退出应用程序之前完成清理任务。我不知道它是如何quart工作的,但你可以把这个逻辑放在后面,app.run()以便在事件循环停止后运行清理任务。这可能会因您的应用程序退出方式而异。检查文档以了解您可以挂钩的某种“关闭时”事件。

于 2020-05-28T14:37:31.290 回答