我在一个应用程序中使用 sqlalchemy,其中 celery 任务使用范围会话与 mysql 交互。
数据库.py
engine = create_engine(
'mysql://root:pass@localhost/testdb?charset=utf8&use_unicode=0',
pool_recycle=3600)
db_session = scoped_session(
sessionmaker(autocommit=False, autoflush=False, bind=engine))
我使用计划定期运行的芹菜任务(使用芹菜节拍)并创建使用会话的任务的任务集
@celery.task
def create_tasks():
tasks = []
tasks.append(charge_now.subtask((tid,)))
job = TaskSet(tasks)
job.apply_async()
以下是构成 TaskSet 的任务
@celery.task
def charge_now(tid):
transaction = db_session.query(TransactionInfo).get(tid)
(之后我正在插入。为了清楚起见,我省略了代码中的一些逻辑)
以下是我得到的错误
任务tasks.charge_now[bf199346-0862-4709-95b8-053859ed5a6f] 引发异常:ResourceClosedError('此结果对象不返回行。它已自动关闭。',)
我知道这是由多个线程试图共享同一个会话造成的问题,但有人可以指出如何解决这个问题吗?