5

我在 python 中使用 sqlalchemy 包。在对现有表执行自动加载后,我有一个操作需要一些时间才能执行。当我尝试使用连接时,这会导致以下错误:

sqlalchemy.exc.OperationalError: (OperationalError) (2006, 'MySQL server has gone away')

我有一个简单的实用函数,可以执行多次插入:

def insert_data(data_2_insert, table_name):
    engine = create_engine('mysql://blah:blah123@localhost/dbname')
    # Metadata is a Table catalog. 
    metadata = MetaData()
    table = Table(table_name, metadata, autoload=True, autoload_with=engine)
    for c in mytable.c:
        print c
    column_names = tuple(c.name for c in mytable.c)
    final_data = [dict(zip(column_names, x)) for x in data_2_insert]
    ins = mytable.insert()
    conn = engine.connect()
    conn.execute(ins, final_data)
    conn.close()

由于“data_2_insert”有 677,161 行,因此执行时间很长。

final_data = [dict(zip(column_names, x)) for x in data_2_insert]

我遇到了这个问题,它指的是类似的问题。但是我不确定如何实现接受的答案建议的连接管理,因为 robots.jpg 在评论中指出了这一点:

SQLAlchemy 0.7 的注意事项 - PoolListener 已弃用,但可以使用新的事件系统实现相同的解决方案。

如果有人可以请告诉我一些关于如何将这些建议整合到我使用 sqlalchemy 的方式的指示,我将非常感激。谢谢你。

4

2 回答 2

12

我想你正在寻找这样的东西:

from sqlalchemy import exc, event
from sqlalchemy.pool import Pool

@event.listens_for(Pool, "checkout")
def check_connection(dbapi_con, con_record, con_proxy):
    '''Listener for Pool checkout events that pings every connection before using.
    Implements pessimistic disconnect handling strategy. See also:
    http://docs.sqlalchemy.org/en/rel_0_8/core/pooling.html#disconnect-handling-pessimistic'''

    cursor = dbapi_con.cursor()
    try:
        cursor.execute("SELECT 1")  # could also be dbapi_con.ping(),
                                    # not sure what is better
    except exc.OperationalError, ex:
        if ex.args[0] in (2006,   # MySQL server has gone away
                          2013,   # Lost connection to MySQL server during query
                          2055):  # Lost connection to MySQL server at '%s', system error: %d
            # caught by pool, which will retry with a new connection
            raise exc.DisconnectionError()
        else:
            raise

如果您希望有条件地触发此策略,则应避免在此处使用装饰器,而是使用listen()函数注册侦听器:

# somewhere during app initialization
if config.check_connection_on_checkout:
    event.listen(Pool, "checkout", check_connection)

更多信息:

于 2013-04-02T10:40:08.513 回答
0

现在有更好的方法来处理它 - pool_recycle

engine = create_engine('mysql://...', pool_recycle=3600)

MySQL 的默认超时时间为 8 小时。这导致连接被 MySQL 关闭,但它上面的引擎(例如 SQLAlchemy)不知道它。

有两种方法可以解决它 -

  1. 乐观- 使用pool_recycle
  2. 悲观- 使用pool_pre_ping=True

我更喜欢使用 pool_recycle ,因为它不会SELECT 1在每次查询之前发出一个 - 减少对数据库的压力

于 2021-10-07T13:12:11.353 回答