我在网上清理了几个例子并想出了这个。这段代码有效,但我想知道是否有更好的方法来做到这一点?
user.py:
from asyncpg import create_pool
from sanic import Blueprint
bp = Blueprint('dp')
class pg:
def __init__(self, pg_pool):
self.pg_pool = pg_pool
async def fetch(self, sql, *args, **kwargs):
async with self.pg_pool.acquire() as connection:
return await connection.fetch(sql, *args, **kwargs)
async def execute(self, sql, *args, **kwargs):
async with self.pg_pool.acquire() as connection:
return await connection.execute(sql, *args, **kwargs)
@bp.listener('before_server_start')
async def init_pg(app, loop):
"""
Init Postgresql DB.
"""
bp.pg_pool = await create_pool(
**app.config.PG_CFG,
max_inactive_connection_lifetime=60,
min_size=1,
max_size=3,
loop=loop,
)
bp.pg = pg(bp.pg_pool)
print('-------- setup connection pool --------')
现在在 webapp.py 中使用 pg 类
webapp.py:
@app.route("/")
async def root(req):
result = await app.pg.fetch('SELECT * FROM foo')