1

任务.py

@task
def daterange():
    import datetime
    some = Poll.objects.all()
    schedule = Schedule.objects.all()
    for c in schedule :
        p = c.poll
        e = c.end_time
        s = c.start_time
        n = int(c.no_of_response)
        ph = Response.objects.filter(poll = p).exclude(sid = 'Null').count()
        now = timezone.now()
        if (c.start_time <= now) & (now <= c.end_time):
            if (n == 0) | (n > ph):
                c.poll.status='Running'
                c.poll.save()
            elif(n == ph):
                c.poll.status='Complete'
                c.poll.save()
                #time.sleep(1000)
            else:
                c.poll.status='Out of Bound'
                c.poll.save()
        elif c.end_time < now:
            c.poll.status='Complete'
            c.poll.save()
        elif c.start_time > now:
            c.poll.status='New'
            c.poll.save()

模型.py

class Schedule(models.Model):
        poll = models.ForeignKey(Poll, on_delete=models.CASCADE)
        start_time = models.DateTimeField() 
        end_time = models.DateTimeField() 
        no_of_response = models.IntegerField(default = 0)

我每 10 秒安排了一个 celery 运行任务。周期性任务包括根据开始时间和结束时间更新民意调查的代码。如果轮询在开始时间和结束时间之间,则将轮询状态更新为正在运行。如果当前状态正在运行,那么当到达结束时间时如何停止轮询。如何杀死该项目中的芹菜任务?

4

1 回答 1

0

重试任务直到成功执行。可能的实现示例:

from celery import current_task

# decorator to retry task
def retry_if_false(func):
    @functools.wraps(func)
    def wrapped(*args, **kwargs):
        if not func(*args, **kwargs):
           current_task.retry()
    return wrapped

 @retry_if_false
 @task
 def daterange():
     ..<your code>..
     return c.poll.status == 'Complete'  # return True if polling finished.

现在您可以像普通任务一样开始您的任务,无需将其添加到 celerybeat。

于 2016-04-26T08:49:07.220 回答