1

我想用下面的方法初始化类的时候组织一个连接池

import asyncio
import asyncpg


class DBCommands:

    def __init__(self, uri: str) -> None:
        loop = asyncio.get_event_loop()
    self.pool: asyncpg.pool.Pool = loop.run_until_complete(asyncpg.create_pool(dsn=uri))

    async def get_id_admins(self) -> list:
        async with self.pool.acquire():
            result = await self.pool.fetch("SELECT chat_id FROM users WHERE role_user = 'admin'")
        admins_id = [row[0] for row in result]
        return admins_id

由于池应该是一个,使用上述实现,这是行不通的。我决定使用单例,但我不明白如何实现它。下面是我想出的版本。告诉我如何最好地解决这个问题。此外,我不明白如何最好以及在哪里关闭连接。我是使用模式的新手,刚刚开始学习 OOP。

import asyncio
import asyncpg

class Singleton(type):
    _instances = {}

    def __call__(cls, *args, **kwargs):
        if cls not in cls._instances:
            cls._instances[cls] = super(Singleton, cls).__call__(*args, **kwargs)
        return cls._instances[cls]


class DBManager(metaclass=Singleton):

    @classmethod
    def connect(cls, uri):
        loop = asyncio.get_event_loop()
        return loop.run_until_complete(asyncpg.create_pool(dsn=uri))


class DBCommands:

    def __init__(self, uri) -> None:
        self.uri = uri
        self.pool = DBManager.connect(uri)

    async def get_id_admins(self) -> list:
        async with self.pool.acquire():
            result = await self.pool.fetch("SELECT chat_id FROM users WHERE role_user = 'admin'")
        admins_id = [row[0] for row in result]
        return admins_id

我假设可以将打开和关闭池添加到__aenter____aexit__

4

1 回答 1

2

class attribute您可以在异步函数中第一次需要时使用并创建池:

class Database:

    self.pool = None

    ...

    async def get_id_admins(self)
        if self.pool is None:
            self.pool = await asyncpg.create_pool(dsn=...`).

我通常使用常规类并创建一个附加到全局对象的单个实例(例如用于 Web 应用程序的aiohttp应用程序),如下所示:

class Database:

    def __init__(self, dsn):
        self.dsn = dsn
        self.pool = None

    async def connect(self):
        """Initialize asyncpg Pool"""
        self.pool = await asyncpg.create_pool(dsn=self.dsn, min_size=2, max_size=4)
        logging.info("successfully initialized database pool")

    async def get_id_admins(self):

    ...

并像这样使用它:

async def startup(app):
    await app.database.connect()

async def shutdown(app):
    await app.database.pool.close()

def main():
    app = web.Application()
    app.database = Database(app.config.DSN)
    app.on_startup.append(startup)
    app.on_shutdown.append(shutdown)
于 2020-06-02T10:08:43.207 回答