2

我在一个应用程序中使用 sqlalchemy,其中 celery 任务使用范围会话与 mysql 交互。

数据库.py

engine = create_engine(
    'mysql://root:pass@localhost/testdb?charset=utf8&use_unicode=0',
    pool_recycle=3600)

db_session = scoped_session(
    sessionmaker(autocommit=False, autoflush=False, bind=engine))

我使用计划定期运行的芹菜任务(使用芹菜节拍)并创建使用会话的任务的任务集

@celery.task
def create_tasks():    
    tasks = []
    tasks.append(charge_now.subtask((tid,)))
    job = TaskSet(tasks)
    job.apply_async()

以下是构成 TaskSet 的任务

@celery.task
def charge_now(tid):
    transaction = db_session.query(TransactionInfo).get(tid)

(之后我正在插入。为了清楚起见,我省略了代码中的一些逻辑)

以下是我得到的错误

任务tasks.charge_now[bf199346-0862-4709-95b8-053859ed5a6f] 引发异常:ResourceClosedError('此结果对象不返回行。它已自动关闭。',)

我知道这是由多个线程试图共享同一个会话造成的问题,但有人可以指出如何解决这个问题吗?

4

0 回答 0