1

我想在 Tornado 应用程序中使用aioredis 。但是,我想不出一种方法来实现其资源的异步启动和关闭,因为 Application 类没有 ASGI Lifespan事件,例如 Quart 或 FastAPI。换句话说,我需要在应用程序开始服务请求之前创建一个 Redis 池,并在应用程序完成或即将结束后立即释放该池。问题是 aioredis pool 创建是异步的,而 Tornado Application 创建是同步的。

基本应用程序如下所示:

    import os

from aioredis import create_redis_pool
from aioredis.commands import Redis
from tornado.httpserver import HTTPServer
from tornado.ioloop import IOLoop
from tornado.web import Application

from .handlers import hello

redis: Redis = None


async def start_resources() -> None:
    '''
    Initialize resources such as Redis and Database connections
    '''
    global redis
    REDIS_HOST = os.environ['REDIS_HOST']
    REDIS_PORT = os.environ['REDIS_PORT']
    redis = await create_redis_pool((REDIS_HOST, REDIS_PORT), encoding='utf-8')


async def close_resources() -> None:
    '''
    Release resources
    '''
    redis.close()
    await redis.wait_closed()


def create_app() -> Application:
    app = Application([
        ("/hello", hello.HelloHandler),
    ])

    return app


if __name__ == '__main__':

    app = create_app()
    http_server = HTTPServer(app)
    http_server.listen(8000)
    IOLoop.current().start()

重要的是我也可以在测试期间使用启动和关闭功能。

有任何想法吗?

4

2 回答 2

1

要创建池,请在开始循环run_sync 之前调用您的协程:

if __name__ == '__main__':
    IOLoop.current().run_sync(start_resources)
    ...

要在程序退出之前销毁池,请使用try...finally块以便还考虑由于未处理的异常而导致的突然退出:

if __name__ == '__main__': 
    # create db pool
    IOLoop.current().run_sync(start_resources)

    ...

    try:
        # start the loop
        IOLoop.current().start()
    except:
        pass
    finally:
        # this will close the pool before exiting
        IOLoop.current().run_sync(close_resources)
于 2020-06-15T08:20:02.247 回答
0

xyres的回应是正确的,让我走上了正轨。我只认为它可以改进一点,所以我发布了这个替代方案:

from contextlib import contextmanager

# ... previous code omitted for brevity


@contextmanager
def create_app() -> Application:
    IOLoop.current().run_sync(start_resources)
    try:
        app = Application([
            ("/hello", hello.HelloHandler),
        ])
        yield app
    finally:
        IOLoop.current().run_sync(close_resources)


if __name__ == '__main__':
    with create_app() as app:
        http_server = HTTPServer(app)
        http_server.listen(8000)
        IOLoop.current().start()

此外,要在测试中使用此代码pytestand pytest-tornado,您应该创建一个conftest.py像这样的文件:

from typing import Iterator

from pytest import fixture
from tornado.platform.asyncio import AsyncIOLoop
from tornado.web import Application

from app.main import create_app


@fixture
def app(io_loop: AsyncIOLoop) -> Iterator[Application]:
    '''
    Return a Tornado.web.Application object with initialized resources
    '''
    with create_app() as app:
        yield app

请注意,声明io_loop为依赖注入很重要。

于 2020-06-17T11:43:19.250 回答