2

我在后面运行aiohttp应用程序。在我的应用程序的初始化模块中,我不使用运行应用程序,而只是创建一个实例,该实例将被导入以在每个工作人员创建的过程中运行它。因此创建了一些工作进程,在其中创建事件循环,然后在这些循环中运行应用程序的请求处理程序。Gunicornnginxweb.run_app(app)GunicornGunicornGunicorn

我的aiohttp应用程序有一组已连接的WebSockets(移动应用程序客户端),我想在由Gunicorn. 我想通知所有 WebSockets连接到所有应用程序进程的人。因此,我使用创建某种上游代理,ZeroMQ并且我想使用zmq.SUB每个应用程序进程中的套接字订阅它。

...所以基本上我想在每个应用程序工作人员中实现这样的目标:

context = zmq.Context()
socket = context.socket(zmq.SUB)
socket.connect('tcp://localhost:5555')

while True:
    event = socket.recv()
    for ws in app['websockets']:
        ws.send_bytes(event)
    # break before app shutdown. How?

如何ZeroMQ在应用程序中侦听代理aiohttp以将消息转发到WebSockets

我可以在哪里将此代码放在事件循环中的后台运行以及如何在aiohttp应用程序的生命周期中正确运行和关闭它?


更新

我已经在 aiohttp 的 GitHub 存储库中创建了一个问题,描述了该问题并提出了可能的解决方案。我非常感谢您在这里或那里就所描述的问题提供意见。

4

1 回答 1

2

好的,关于这个问题的问题和讨论导致了我贡献的新功能aiohttp,即在版本1.0中,我们将能够on_startup使用方法注册应用程序信号Application.on_startup()

文档
主分支上的工作示例

#!/usr/bin/env python3
"""Example of aiohttp.web.Application.on_startup signal handler"""
import asyncio

import aioredis
from aiohttp.web import Application, WebSocketResponse, run_app

async def websocket_handler(request):
    ws = WebSocketResponse()
    await ws.prepare(request)
    request.app['websockets'].append(ws)
    try:
        async for msg in ws:
            print(msg)
            await asyncio.sleep(1)
    finally:
        request.app['websockets'].remove(ws)
    return ws


async def on_shutdown(app):
    for ws in app['websockets']:
        await ws.close(code=999, message='Server shutdown')


async def listen_to_redis(app):
    try:
        sub = await aioredis.create_redis(('localhost', 6379), loop=app.loop)
        ch, *_ = await sub.subscribe('news')
        async for msg in ch.iter(encoding='utf-8'):
            # Forward message to all connected websockets:
            for ws in app['websockets']:
                ws.send_str('{}: {}'.format(ch.name, msg))
            print("message in {}: {}".format(ch.name, msg))
    except asyncio.CancelledError:
        pass
    finally:
        print('Cancel Redis listener: close connection...')
        await sub.unsubscribe(ch.name)
        await sub.quit()
        print('Redis connection closed.')


async def start_background_tasks(app):
    app['redis_listener'] = app.loop.create_task(listen_to_redis(app))


async def cleanup_background_tasks(app):
    print('cleanup background tasks...')
    app['redis_listener'].cancel()
    await app['redis_listener']


async def init(loop):
    app = Application(loop=loop)
    app['websockets'] = []
    app.router.add_get('/news', websocket_handler)
    app.on_startup.append(start_background_tasks)
    app.on_cleanup.append(cleanup_background_tasks)
    app.on_shutdown.append(on_shutdown)
    return app

loop = asyncio.get_event_loop()
app = loop.run_until_complete(init(loop))
run_app(app)
于 2016-09-06T10:38:34.710 回答