Celery 不支持这种开箱即用的方法,但我过去不得不做类似的事情,我不得不自己偶然发现一个解决方案。
根据我的经验,有两种比较直接的方法可以实现这一点,两者都需要权衡取舍。您还可以使用这些东西进入一些相当大的漏洞,所以请谨慎购买。
选项1:
使用一些数据存储来保存有关何时运行任务并触发 celery beat 任务的信息。
为此,您可以使用您的数据库和一个包含有关周期性任务的一些信息的模型。(如果你想获得更多的技术,你甚至可以直接与队列交谈并跳过模型路线,可能也是如此。)
from django.db import models
class PeriodicTask(models.Model):
lastrun = models.DateTimeField()
nextrun = models.DateTimeField()
notes = models.TextField() # errors?
task_id = models.CharField(max_length=100)
这只是对模型可能存储的内容的粗略概念。您可以将任何有用的东西放在那里,但您需要一个日期时间对象来存储下一次运行的时间。
接下来,您的周期性任务需要更频繁地运行以启动并查看是否有任何任务需要尽快执行:
import datetime
from .models import PeriodicTask
@periodic_task(run_every=timedelta(minutes=2), queue='activities', options={'queue': 'activities'})
def pull_activities_frequent_adaptors():
now = datetime.datetime.utcnow() # need to be clear about time-zones
scheduled_tasks = PeriodicTask.objects.filter(nextrun__gte=now)
if scheduled_tasks and scheduled_tasks.count() == 1: # more than one and we've erred somewhere
timewindow = datetime.timedelta(minutes=5)
if (scheduled_tasks[0].nextrun - now) <= timewindow:
scheduled_tasks[0].delete()
# Do the task
# schedule the next one
PeriodicTask.objects.create(
lastrun=now,
nextrun=now + datetime.timedelta(minutes=30))
潜在问题:
1)如果您有多个具有主从设置的数据库,特别是如果您有滞后,您最终可能会进行双重调度(即使是count() == 1
部分)。因此,有一个值得思考的竞争条件。
2) 很难精确到 30 分钟,因为您必须使用时间窗口来查找要执行的任务。
3)任务需要比你的时间窗口更频繁地运行,否则你可能会错过它。这可能是一种资源浪费(但我想这并不算太糟糕),因为它通常会旋转并且什么都不做。
4) 没有什么比处理日期时间更令人难以置信的了,所以你必须真正考虑时区的事情并考虑所有的变化并测试这段代码。
5)这是一个大问题:如果任务的运行时间比它计划的时间间隔长,那么你将有两个任务同时运行,这是一个问题。同样,在竞争条件下,事情可能会变得很冒险。
选项 2)
不要使用 celery beat:启动第一个任务并在 30 分钟后启动另一个任务。这有可能成为一个失控的巫师学徒类型的东西,所以我觉得它有点,嗯,可怕,虽然我做了第一个选择,但我从来没有真正说服自己去做下面的事情。但是,无论如何,我认为可以做到:
@task # no longer a periodic task
def your_task(args):
# Whatever you want to do, then call itself again...
your_task.apply_async(args=(args), countdown=1800)
现在你只需要在某个地方调用它,可能是在一个每周启动一次并杀死这个东西的任何以前版本(它是如何找到它们的?)的 cron 作业中,然后启动第一个。
我不得不说我真的不喜欢这个答案,即使我发生过几次,这似乎是解决问题的一种更危险和不守规矩的方式。不过,如果有人这样做,我会很好奇。