0

我正在使用 celery 和 Django 创建一个周期任务,我想每 X 秒运行一次。该任务应该产生几个子任务,但我需要确保每个主任务只产生一组子任务。

这就是我所拥有的。。

@periodic_task(run_every=datetime.timedelta(seconds=2))
def initialize_new_jobs():
    for obj in Queue.objects.filter(status__in=['I', 'Q']):
        obj = Queue.objects.get(id=obj.id)
        if obj.status not in ['I', 'Q']:
            continue
        obj.status = 'A'
        obj.save()
        create_other_task.delay(obj.id)

这有点工作,但感觉不对。我必须在循环开始时刷新 obj 以确保另一个正在运行的periodic_task 不会在同一个 Queue 对象上发出 create_other_task 。

有没有更好的方法来做这种工作?基本上,我想尽可能频繁地执行 create_other_task,但每个状态为 I 或 Q 的队列对象只能执行一次。

这是我的问题的简化版本,所以请忽略我可以在创建 Queue 对象时只运行 create_other_task 的事实,而不是运行周期性任务 :)

4

1 回答 1

1

您可以使用交易:

@periodic_task(run_every=datetime.timedelta(seconds=2))
@transaction.commit_on_success
def initialize_new_jobs():
    for obj in Queue.objects.select_for_update().filter(status__in=['I', 'Q']):
        obj.status = 'A'
        obj.save()
        create_other_task.delay(obj.id)

select_for_update()对行设置排他锁,以便其他用户在尝试读取值时被阻止。在事务提交或回滚后释放锁。参考

这样,您可以确定 or 的obj状态为IorQ并且obj.save()可以正常工作。

于 2013-08-25T16:04:32.103 回答