我有一个任务calculate_common_locations通过运行一次CELERYBEAT_SCHEDULE
。该任务只是调用数据库中的一个函数:
@app.task
def calculate_common_locations():
db.execute("SELECT * FROM calculate_centroids('b')")
这是中的条目CELERYBEAT_SCHEDULE
:
CELERYBEAT_SCHEDULE = {
'common_locations': {
'task': 'clients.tasks.calculate_common_locations',
'schedule': crontab(hour=23, day_of_week='sun'), #every week
},
[..]
}
该计划包括更多每天或每 10 秒运行一次的任务。这些任务似乎不会重新运行很多次。芹菜花显示任务执行了 20 次以上。第一个按计划启动,运行约 100 秒,成功,然后再次启动。
只有一个 celerybeat 正在运行:
ps -Af | grep celerybeat
foo 24359 779 0 01:53 ? 00:00:04 [celeryd: celery@celery:MainProcess] -active- (worker --beat --app=cloud.celeryapp:app --concurrency=10 -l INFO -s /home/foo/run/celerybeat-schedule --pidfile=/home/foo/run/celerybeat.pid)
这就是 celery 的启动方式(通过 supervisord):
celery worker --beat --app=cloud.celery app:app --concurrency=10 -l INFO -s /home/foo/run/celerybeat-schedule --pidfile=/home/foo/run/celerybeat.pid
我已经在没有--concurrency=10开关的情况下对其进行了测试。数据库函数仍然执行多次。
该函数从一个经常插入(每秒几次)的大表(> 1 百万行)中读取。Postgres 锁显示所有锁都被授予。
是否有可能因为查询在某个时刻终止而重新运行任务?
在以下情况下没有问题:
- 该任务从 django shell 运行(直接或通过
.delay()
), - 任务的内容被一个轻量级的 sql 查询替换(select * from test),
- 任务的内容被替换为sleep(100)。
版本:
- 芹菜==3.1.12
- psql (PostgreSQL) 9.3.5