我有一个名为mainsite
.
该站点以相当异步的方式工作,主要是通过从视图启动的线程来执行后端操作。
它使用 sqlalchemy 连接到 mysql,并使用 ZopeTransactionExtension 进行会话管理。
到目前为止,该应用程序运行良好。
我需要在它上面运行定期作业,它需要使用一些从视图中启动的相同异步函数。
我使用了 apscheduler,但遇到了问题。所以我想到了使用 celery beat 作为一个单独的进程,将 mainapp 视为一个库并导入要使用的函数。
我的 celery 配置如下所示:
from datetime import timedelta
from api.apiconst import RERUN_CHECK_INTERVAL, AUTOMATION_CHECK_INTERVAL, \
AUTH_DELETE_TIME
BROKER_URL = 'sqla+mysql://em:em@localhost/edgem'
CELERY_RESULT_BACKEND = "database"
CELERY_RESULT_DBURI = 'mysql://em:em@localhost/edgem'
CELERYBEAT_SCHEDULE = {
'rerun': {
'task': 'tasks.rerun_scheduler',
'schedule': timedelta(seconds=RERUN_CHECK_INTERVAL)
},
'automate': {
'task': 'tasks.automation_scheduler',
'schedule': timedelta(seconds=20)
},
'remove-tokens': {
'task': 'tasks.token_remover_scheduler',
'schedule': timedelta(seconds=2 * 24 * 3600 )
},
}
CELERY_TIMEZONE = 'UTC'
tasks.py 是
from celery import Celery
celery = Celery('tasks')
celery.config_from_object('celeryconfig')
@celery.task
def rerun_scheduler():
from mainsite.task import check_update_rerun_tasks
check_update_rerun_tasks()
@celery.task
def automation_scheduler():
from mainsite.task import automate
automate()
@celery.task
def token_remover_scheduler():
from mainsite.auth_service import delete_old_tokens
delete_old_tokens()
请记住,上述所有函数都会立即返回,但如果需要则启动线程
线程通过执行将对象保存到 db 中transaction.commit() after session.add(object)
。
问题是整个事情像宝石一样只能工作大约 30 分钟。之后,ResourceClosedError: The transaction is closed
只要有transaction.commit()
. 我不确定是什么问题,我需要帮助进行故障排除。
我在任务中导入的原因是为了摆脱这个错误。认为每次需要运行任务时都导入是一个好主意,我可能每次都会得到一个新事务,但看起来情况并非如此。