8

我正在使用 python3.8 的内置 asyncio 包和安装的 aiomysql 包来实现异步 MySQL 查询执行。即使我已经正确关闭了所有打开的光标和连接,错误消息仍然出现在我的控制台上,如下所示。

An open stream object is being garbage collected; call "stream.close()" explicitly.

代码摘要如下...

#db.py

import asyncio

class AsyncMysqlSession:

    def __init__(self, loop, db_settings=DEFAULTDB):
        self.db_settings = db_settings
        self.loop = loop

    async def __aenter__(self):
        self.conn = await aiomysql.connect(host=self.db_settings['HOST'],
                                       port=self.db_settings['PORT'],
                                       user=self.db_settings['USER'],
                                       password=self.db_settings['PASSWORD'],
                                       db=self.db_settings['NAME'],
                                       loop=self.loop)
        self.cursor = await self.conn.cursor(aiomysql.cursors.DictCursor)
        return self

    async def __aexit__(self, exception, value, traceback):
        await self.cursor.close()
        self.conn.close()

    async def query(self, sql, *args):
        await self.cursor.execute(sql, values)
        await self.conn.commit()
        rows = await self.cursor.fetchall()
        return list(rows)


async def aiomysql_query(sql, *args):
    """
    Mysql asynchronous connection wrapper
    """
    loop = asyncio.get_event_loop()
    async with AsyncMysqlSession(loop) as mysql:
        db_result = await mysql.query(sql, *args)
        return db_result

aiomysql_query 被导入另一个文件

#views.py

 import asyncio
 .....
 

 async def main():
     .....
     .....
     
     await aiomysql_query(sql1, *args1)
     await aiomysql_query(sql2, *args2)

 .....

 asyncio.run(main())

 ....

我在这里做错了什么(?)还是不正确地显示错误消息?任何解决此问题的线索将不胜感激...... TIA!

4

1 回答 1

1

似乎您可能刚刚忘记关闭事件循环 - 除了await conn.wait_closed()上面@VPfB 建议的。

手动使用较低级别的方法调用(例如asyncio.get_event_loop(). 具体来说,self.loop.close()必须调用。

#db.py

import asyncio

class AsyncMysqlSession:

    def __init__(self, loop, db_settings=DEFAULTDB):
        self.db_settings = db_settings
        self.loop = loop

    async def __aenter__(self):
        self.conn = await aiomysql.connect(host=self.db_settings['HOST'],
                                       port=self.db_settings['PORT'],
                                       user=self.db_settings['USER'],
                                       password=self.db_settings['PASSWORD'],
                                       db=self.db_settings['NAME'],
                                       loop=self.loop)
        self.cursor = await self.conn.cursor(aiomysql.cursors.DictCursor)
        return self

    async def __aexit__(self, exception, value, traceback):
        await self.cursor.close()
        self.conn.close()
        self.loop.close()

    async def query(self, sql, *args):
        await self.cursor.execute(sql, values)
        await self.conn.commit()
        rows = await self.cursor.fetchall()
        return list(rows)


async def aiomysql_query(sql, *args):
    """
    Mysql asynchronous connection wrapper
    """
    loop = asyncio.get_event_loop()
    async with AsyncMysqlSession(loop) as mysql:
        db_result = await mysql.query(sql, *args)
        return db_result

参考

https://docs.python.org/3/library/asyncio-eventloop.html#asyncio.loop.shutdown_asyncgens

https://docs.python.org/3/library/asyncio-eventloop.html#asyncio.get_event_loop

于 2021-09-06T06:19:13.757 回答