我这样做的方式是在 db.py 中。
class Database:
def __init__(self,user,password,host,database,port="5432"):
self.user = user
self.password = password
self.host = host
self.port = port
self.database = database
self._cursor = None
self._connection_pool = None
async def connect(self):
if not self._connection_pool:
try:
self._connection_pool = await asyncpg.create_pool(
min_size=1,
max_size=20,
command_timeout=60,
host=self.host,
port=self.port,
user=self.user,
password=self.password,
database=self.database,
ssl="require"
)
logger.info("Database pool connectionn opened")
except Exception as e:
logger.exception(e)
async def fetch_rows(self, query: str,*args):
if not self._connection_pool:
await self.connect()
else:
con = await self._connection_pool.acquire()
try:
result = await con.fetch(query,*args)
return result
except Exception as e:
logger.exception(e)
finally:
await self._connection_pool.release(con)
async def close(self):
if not self._connection_pool:
try:
await self._connection_pool.close()
logger.info("Database pool connection closed")
except Exception as e:
logger.exception(e)
然后在应用程序中
@app.on_event("startup")
async def startup_event():
database_instance = db.Database(**db_arguments)
await database_instance.connect()
app.state.db = database_instance
logger.info("Server Startup")
@app.on_event("shutdown")
async def shutdown_event():
if not app.state.db:
await app.state.db.close()
logger.info("Server Shutdown")
然后,您可以通过在路由中传入请求参数来获取带有 request.app.state.db 的数据库实例。