7

让我们看看下一个片段 -

@event.listens_for(Pool, "checkout")
def check_connection(dbapi_con, con_record, con_proxy):

cursor = dbapi_con.cursor()
try:
    cursor.execute("SELECT 1")  # could also be dbapi_con.ping(),
                                # not sure what is better
except exc.OperationalError, ex:
    if ex.args[0] in (2006,   # MySQL server has gone away
                      2013,   # Lost connection to MySQL server during query
                      2055):  # Lost connection to MySQL server at '%s', system error: %d
        # caught by pool, which will retry with a new connection
        raise exc.DisconnectionError()
    else:
        raise


engine = create_engine('mysql://user:puss123@10.0.51.5/dbname', pool_recycle = 3600,pool_size=10, listeners=[check_connection])

session_factory = sessionmaker(bind = engine, autoflush=True, autocommit=False)
db_session = session_factory()

...
some code that may take several hours to run
...

db_session.execute('SELECT * FROM ' + P_TABLE + " WHERE id = '%s'" % id)        

我认为在 checkout 事件下注册 checkout_connection 函数可以解决它,但现在问题不是我想如何告诉 SQLAlchemy 处理连接丢失,所以每次我调用 execute() 时它都会检查连接是否可用,如果不是会再次启动它吗?

- - 更新 - -

SQLAlchemy 的版本是 0.7.4

- - 更新 - -

def checkout_listener(dbapi_con, con_record, con_proxy):
    try:
        try:
            dbapi_con.ping(False)
        except TypeError:
            dbapi_con.ping()
    except dbapi_con.OperationalError as exc:
        if exc.args[0] in (2006, 2013, 2014, 2045, 2055):
            raise DisconnectionError()
        else:
            raise


engine = create_engine(CONNECTION_URI, pool_recycle = 3600,pool_size=10)
event.listen(engine, 'checkout', checkout_listener)
session_factory = sessionmaker(bind = engine, autoflush=True, autocommit=False)
db_session = session_factory()

session_factory 被发送到每个新创建的线程

class IncidentProcessor(threading.Thread):

    def __init__(self, queue, session_factory):
        if not isinstance(queue, Queue.Queue):
            raise TypeError, "first argument should be of %s" (type(Queue.Queue))
        self.queue = queue
        self.db_session = scoped_session(session_factory)
        threading.Thread.__init__(self)

    def run(self):

    self.db_session().execute('SELECT * FROM ...')

    ...
        some code that takes alot of time
    ...

    self.db_session().execute('SELECT * FROM ...')

现在,当执行在很长一段时间后运行时,我得到“MySQL 服务器已消失”错误

4

3 回答 3

10

有一个关于这个的讨论,这个文档很好地描述了这个问题,所以我使用他们推荐的方法来处理这些错误:http ://discorporate.us/jek/talks/SQLAlchemy-EuroPython2010.pdf

它看起来像这样:

from sqlalchemy import create_engine, event
from sqlalchemy.exc import DisconnectionError


def checkout_listener(dbapi_con, con_record, con_proxy):
    try:
        try:
            dbapi_con.ping(False)
        except TypeError:
            dbapi_con.ping()
    except dbapi_con.OperationalError as exc:
        if exc.args[0] in (2006, 2013, 2014, 2045, 2055):
            raise DisconnectionError()
        else:
            raise


db_engine = create_engine(DATABASE_CONNECTION_INFO,
                          pool_size=100,
                          pool_recycle=3600)
event.listen(db_engine, 'checkout', checkout_listener)
于 2013-08-05T11:37:22.357 回答
3

尝试pool_recyclecreate_engine.

文档中

连接超时

MySQL 具有自动连接关闭行为,用于已空闲 8 小时或更长时间的连接。要避免出现此问题,请使用 pool_recycle 选项来控制任何连接的最大年龄:

engine = create_engine('mysql+mysqldb://...', pool_recycle=3600)

于 2014-01-15T08:49:59.743 回答
-1

你可以尝试这样的事情:

while True:
    try:
        db_session.execute('SELECT * FROM ' + PONY_TABLE + " WHERE id = '%s'" % incident_id)
        break
    except SQLAlchemyError:
        db_session.rollback()

如果连接已经消失,这将引发异常,会话将被rollbackd,它会再试一次很可能会成功。

于 2013-08-05T09:10:57.273 回答