0

我正面临一个我很难弄清楚的问题。我有一个 Python 脚本,它使用aiomysql它连接到 MySQL 并运行一些查询。

一开始它工作得很好,但过了一段时间,我开始遇到错误,例如:

2021-11-03 10:40:21 36934 [警告] 与数据库的连接 36934 已中止:“数据库”用户:“用户”主机:“本地主机”(读取通信数据包时出错)

查看代码,我有以下回溯:

Traceback (most recent call last):
  File "/var/www/project/env/lib/python3.7/site-packages/aiomysql/connection.py", line 502, in _connect
    await self._get_server_information()
  File "/var/www/project/env/lib/python3.7/site-packages/aiomysql/connection.py", line 989, in _get_server_information
    packet = await self._read_packet()
  File "/var/www/project/env/lib/python3.7/site-packages/aiomysql/connection.py", line 561, in _read_packet
    packet_header = await self._read_bytes(4)
  File "/var/www/project/env/lib/python3.7/site-packages/aiomysql/connection.py", line 598, in _read_bytes
    data = await self._reader.readexactly(num_bytes)
  File "/usr/lib/python3.7/asyncio/streams.py", line 679, in readexactly
    await self._wait_for_data('readexactly')
  File "/usr/lib/python3.7/asyncio/streams.py", line 473, in _wait_for_data
    await self._waiter
concurrent.futures._base.CancelledError

The above exception was the direct cause of the following exception:

Traceback (most recent call last):
  File "/var/www/project/server/workers/utils/aiosmtp.py", line 402, in _handle_client
    await method(arg)
  File "/var/www/project/server/workers/utils/aiosmtp.py", line 836, in smtp_RCPT
    status = await self._call_handler_hook('RCPT', address, rcpt_options)
  File "/var/www/project/server/workers/utils/aiosmtp.py", line 125, in _call_handler_hook
    return await hook(self, self.session, self.envelope, *args)
  File "/var/www/project/server/workers/smtpd.py", line 622, in handle_RCPT
    details = await AsyncDatabase.one("SELECT id FROM accounts WHERE email = %(email)s", email=email)
  File "/var/www/project/server/workers/utils/db.py", line 54, in one
    async with pool.acquire() as connection:
  File "/var/www/project/env/lib/python3.7/site-packages/aiomysql/utils.py", line 98, in __aenter__
    self._conn = await self._coro
  File "/var/www/project/env/lib/python3.7/site-packages/aiomysql/pool.py", line 135, in _acquire
    await self._fill_free_pool(True)
  File "/var/www/project/env/lib/python3.7/site-packages/aiomysql/pool.py", line 181, in _fill_free_pool
    **self._conn_kwargs)
  File "/var/www/project/env/lib/python3.7/site-packages/aiomysql/connection.py", line 75, in _connect
    await conn._connect()
  File "/var/www/project/env/lib/python3.7/site-packages/aiomysql/connection.py", line 523, in _connect
    self._host) from e
pymysql.err.OperationalError: (2003, "Can't connect to MySQL server on '127.0.0.1'")

我正在使用的数据库库是aiomysql并且我正在使用连接池来处理它。这是代码(以及之后我如何使用它的示例):

import os, aiomysql


class AsyncDatabase(object):
    _instance = None

    def __init__(self):
        self.pool = None

    async def get_pool(self):
        if not self.pool:
            self.pool = await aiomysql.create_pool(
                minsize=1,
                maxsize=10,
                db=os.getenv('DATABASE_NAME'),  # dbname?
                user=os.getenv('DATABASE_USER'),
                password=os.getenv('DATABASE_PASS'),
                #host=os.getenv('DATABASE_HOST'),
                #port=int(os.getenv('DATABASE_PORT', 3306)),
                unix_socket='/run/mysqld/mysqld.sock',
                charset="utf8",
                autocommit=False,
                pool_recycle=60
            )

        return self.pool

    @classmethod
    async def pool(cls):
        """ Singleton """
        if not cls._instance:
            cls._instance = cls()

        return PoolEntry(await cls._instance.get_pool())

    @classmethod
    async def query(cls, query, **kwargs):
        """Simply runs the query without returning the results"""
        pool = await cls.pool()
        async with pool as cursor:
            await cursor.execute(query, kwargs)

        return None

    @classmethod
    async def one(cls, query, **kwargs):
        """Runs the query and return the first row"""
        pool = await cls.pool()
        async with pool as cursor:
            await cursor.execute(query, kwargs)
            return await cursor.fetchone()

    @classmethod
    async def all(cls, query, **kwargs):
        """Runs the query and return all the rows"""
        pool = await cls.pool()
        async with pool as cursor:
            await cursor.execute(query, kwargs)
            return await cursor.fetchall()


class PoolEntry:
    def __init__(self, pool):
        self.pool = pool
    
    async def __aenter__(self):
        self.conn = await self.pool.acquire()
        self.cursor = await self.conn.cursor()

        return self.cursor

    async def __aexit__(self, exc_type, exc_value, exc_tb):
        if self.cursor and not self.cursor.closed:
            try:
                await self.cursor.close()
            except Exception:
                pass

        self.cursor = None

        if self.conn:
            self.pool.release(self.conn)

        self.conn = None

        self.pool = None

我正在使用的请求示例:

user = await AsyncDatabase.one("SELECT id FROM accounts WHERE email = %(email)s", email=email)

我怀疑这个游泳池不知何故弄乱了,后来导致了这些问题,但我无法解释如何或为什么。

如果需要,我很乐意提供任何其他详细信息。

先感谢您。

4

0 回答 0