6

我有一个 Django 应用程序将对象保存到数据库和一个 celery 任务,它定期对其中一些对象进行一些处理。问题是用户可以在 celery 任务选择进行处理之后,但celery 任务实际完成处理和保存之前删除一个对象。因此,当 celery 任务确实调用 时.save(),即使用户删除了该对象,该对象也会重新出现在数据库中。当然,这对用户来说真的很可怕。

所以这里有一些显示问题的代码:

def my_delete_view(request, pk):
    thing = Thing.objects.get(pk=pk)
    thing.delete()
    return HttpResponseRedirect('yay')

@app.task
def my_periodic_task():
    things = get_things_for_processing()
    # if the delete happens anywhere between here and the .save(), we're hosed
    for thing in things:
        process_thing(thing) # could take a LONG time
        thing.save()

我想过通过添加一个原子块和一个事务来尝试修复它,以在保存之前测试对象是否确实存在:

@app.task
def my_periodic_task():
    things = Thing.objects.filter(...some criteria...)
    for thing in things:
        process_thing(thing) # could take a LONG time
        try:
            with transaction.atomic():
                # just see if it still exists:
                unused = Thing.objects.select_for_update().get(pk=thing.pk)
                # no exception means it exists. go ahead and save the
                # processed version that has all of our updates.
                thing.save()
         except Thing.DoesNotExist:
             logger.warning("Processed thing vanished")

这是做这种事情的正确模式吗?我的意思是,我会在生产中运行它的几天内找出它是否可以工作,但是很高兴知道是否有任何其他被广泛接受的模式来完成这类事情。

我真正想要的是能够更新一个对象,如果它仍然存在于数据库中。我对用户编辑和来自 的编辑之间的竞争感到满意process_thing,我总是可以在refresh_from_db之前添加一个,process_thing以最大限度地减少用户编辑丢失的时间。但是在用户删除它们后,我绝对不能让对象重新出现。

4

2 回答 2

0

如果你在处理 celery 任务的时候打开一个事务,你应该避免这样的问题:

@app.task
@transaction.atomic
def my_periodic_task():
    things = get_things_for_processing()
    # if the delete happens anywhere between here and the .save(), we're hosed
    for thing in things:
        process_thing(thing) # could take a LONG time
        thing.save()

有时,您想向前端报告您正在处理数据,因此您可以添加select_for_update()到您的查询集(很可能在 get_things_for_processing 中),然后在负责删除的代码中,您需要在 db 报告时处理错误特定记录被锁定。

于 2016-06-05T10:34:27.410 回答
0

就目前而言,“再次原子选择,然后保存”的模式似乎就足够了:

@app.task
def my_periodic_task():
    things = Thing.objects.filter(...some criteria...)
    for thing in things:
        process_thing(thing) # could take a LONG time
        try:
            with transaction.atomic():
                # just see if it still exists:
                unused = Thing.objects.select_for_update().get(pk=thing.pk)
                # no exception means it exists. go ahead and save the
                # processed version that has all of our updates.
                thing.save()
         except Thing.DoesNotExist:
             logger.warning("Processed thing vanished")

(这与我原来的问题中的代码相同)。

于 2016-06-16T04:03:01.953 回答