3

This is a followup queston from here.

Django 1.3.1, celery 2.2.7, python2.6.

I have the following in the fruits/models.py:

Consider the model:

class Fruit(models.Model):
    name = models.CharField(max_length=50)

    def __unicode__(self):
        return self.name

And the following in the fruits/tasks.py:

from django.dispatch import receiver
from django.db.models import signals

from celery.task import periodic_task, task
import fruits.models as m
import time

@task()
def check_fruit(id):
    time.sleep(2)
    try:
        fruit = m.Fruit.objects.get(pk=id)
        print "Fruit %s is found!" % fruit.name
    except m.Fruit.DoesNotExist:
        print "no such fruit"

@receiver(signals.pre_save, sender=m.Fruit, dispatch_uid="on_fruit_save")
def on_custom_feed_save(sender, instance, **kwargs):
    check_fruit.apply_async(args=[instance.id])

I launch celery daemon, then open django shell and type:

import fruits.tasks;
import fruits.models as m;
m.Fruit(name="plum").save()

Question: I would expect that the task would find the fruit, but it never does. Why?

(I'm launching task from pre-save signal on purpose to simulate a problem that happens on a large system).

4

3 回答 3

1

老问题实际上很简单,问题不是竞争条件或错误等。

该示例从头开始创建新对象,并且在 pre_save 信号期间该对象尚未存储在数据库中。所以 'instance.id' 没有设置,对于新创建的对象它是 None 。

check_fruit.delay(None)

由 pre_save 信号调用并产生

fruit = m.Fruit.objects.get(pk=None)

数据库中没有任何主键为空的行。

Celery 任务实际上是检查这个查询是否有新创建的对象。查询总是按预期抛出异常。

使用post_save信号并检查新对象的信号的创建参数。

使用 post_save 后,可能会出现与事务相关的竞争条件。大多数时候你不会在意,任务重试主要是有效的。但是如果你关心比赛条件,看看新的 django 特性,on_commit钩子

于 2015-11-26T23:10:19.660 回答
-1

这周我自己也遇到了一个非常相似的问题,除了我在没有 Django 的情况下使用 Celery。我发现 sender 参数仅在将任务实例传递给它时才起作用,而不仅仅是对 sender 类本身的引用。

我稍微研究了一下问题,发现 celery 使用 sender 参数来比较特定任务的 id 和信号的注册发送者(设置为过滤特定发送者。)

在 celery/utils/dispatch/signals.py 模块中,以下执行此评估:

def _make_id(target):  # pragma: no cover
    if hasattr(target, 'im_func'):
        return (id(target.im_self), id(target.im_func))
    return id(target) 

首先获取某个目标对象的 id。当在装饰器中指定发送者时,它被保存在类似于这样的元组中:

lookup_key = (_make_id(receiver), _make_id(sender))

稍后,当一个任务触发时,该 _live_receivers方法会被调用,该方法本质上是执行一个评估,以查看lookup_key 中指定的目标发送者是否与当前发送者的 id 匹配(触发任务):

def _live_receivers(self, senderkey):
        """Filter sequence of receivers to get resolved, live receivers.

        This checks for weak references and resolves them, then returning only
        live receivers.

        """
        none_senderkey = _make_id(None)
        receivers = []

        for (receiverkey, r_senderkey), receiver in self.receivers:
            if r_senderkey == none_senderkey or r_senderkey == senderkey:
                if isinstance(receiver, WEAKREF_TYPES):
                    # Dereference the weak reference.
                    receiver = receiver()
                    if receiver is not None:
                        receivers.append(receiver)
                else:
                    receivers.append(receiver)
        return receivers

现在我遇到的问题是,即使我指定了我想要接收信号的任务,但当它被触发时,发送者键永远不会匹配。

我可以让它正常工作的唯一方法是从__init__我为特定任务本身构建的抽象任务类的方法中注册信号。通过这样做,我能够向 Celery 传递self它为任务注册的确切实例 ( )。

我不知道这个问题是否与我的逻辑或对信号如何工作的理解有关,或者它是否是 Celery 中的一个错误,但我确实知道传递任务实例解决了问题,此后一切正常。

需要注意的一些事项:

  1. 我没有使用 Django,因此没有使用 Django-Celery 扩展
  2. 我不确定问题出在 Celery 上(更有可能是我在逻辑上犯了错误或在某处误解了某些东西)
  3. 我不知道这种情况是否适用于您,因为我不确定 Django-Celery 如何改变 Celery 后端的工作方式。

尽管如此,我希望这会有所帮助。

祝你好运!

于 2012-08-29T17:02:46.553 回答
-1

老问题和问题现在可能已经解决了,但是对于未来的访客......

听起来芹菜工人有一个长期运行的事务打开,如果它从未提交,它永远不会看到新对象。在任务中运行数据库查询之前尝试添加:

from django.db import transaction
transaction.commit()

请注意,Django 1.6 对事务管理进行了一些更改,但在撰写本文时还没有发布。

于 2013-09-05T08:39:37.803 回答