3

我有五个不同的 Django 项目,它们都在一个安装了 RabbitMQ 的盒子上运行。我将芹菜用于各种任务。每个项目似乎都在接收用于其他项目的任务。

每个代码库都有自己的虚拟环境,运行以下内容:

./manage.py celeryd --concurrency=2 --queues=high_priority

每个 settings.py 中的参数如下所示:

CELERY_SEND_EVENTS = True
CELERY_TASK_RESULT_EXPIRES = 10
CELERY_RESULT_BACKEND = 'amqp'
CELERYBEAT_SCHEDULER = "djcelery.schedulers.DatabaseScheduler"
CELERY_TIMEZONE = 'UTC'
BROKER_URL = 'amqp://guest@127.0.0.1:5672//'
BROKER_VHOST = 'specific_app_name'

我看到回溯让我认为应用程序在不应该收到彼此的消息时:

Traceback (most recent call last):
  File "/home/.../.virtualenvs/.../local/lib/python2.7/site-packages/kombu/messaging.py", line 556, in _receive_callback
    decoded = None if on_m else message.decode()
  File "/home/.../.virtualenvs/.../local/lib/python2.7/site-packages/kombu/transport/base.py", line 147, in decode
    self.content_encoding, accept=self.accept)
  File "/home/.../.virtualenvs/.../local/lib/python2.7/site-packages/kombu/serialization.py", line 187, in decode
    return decode(data)
  File "/home/.../.virtualenvs/.../local/lib/python2.7/site-packages/kombu/serialization.py", line 74, in pickle_loads
    return load(BytesIO(s))
ImportError: No module named emails.models

在这种情况下,emails.models模块出现在一个项目中,而不出现在其他项目中。然而其他人正在展示这种追溯。

我没有看过多个节点名称或类似的东西。像这样的东西会解决这个问题吗?

4

3 回答 3

1

您的 AMQP 设置celeryconfig.py错误。您正在使用:

BROKER_URL = 'amqp://guest@127.0.0.1:5672//'
BROKER_VHOST = 'specific_app_name'

BROKER_VHOST参数被忽略,因为BROKER_URL存在(也已弃用)。如果您想使用虚拟主机(顺便说一句,这是解决您提出的问题的首选方法),您应该为每个应用程序创建一个虚拟主机,并在每个应用程序设置中使用以下内容:

BROKER_URL = 'amqp://guest@127.0.0.1:5672//specific_app_name'

编辑:修复丢失/

于 2013-05-24T15:48:02.777 回答
0

您应该为每个项目指定不同的队列设置。例如:

CELERY_QUEUES = {
    "celery": {
        "exchange": "project1_celery",
        "binding_key": "project1_celery"},
}
CELERY_DEFAULT_QUEUE = "celery"

对于第二个项目,您指定 exchange 和 binding_keyproject2_celery等等。

我发布的代码适用于 Celery<3.0。如果您使用的是较新版本,它可能如下所示(我自己还没有使用过新版本,所以我不确定):

from kombu import Exchange, Queue


CELERY_DEFAULT_QUEUE = 'celery'
CELERY_QUEUES = (
    Queue('celery', Exchange('project1_celery'), routing_key='project1_celery'),
)

您可以在 celery 文档中阅读更多内容:http: //docs.celeryproject.org/en/latest/userguide/routing.html

于 2013-05-23T15:15:54.697 回答
0

由于多个 django 项目共享同一个框,您需要为每个项目显式地“命名空间”@tasks。错误消息返回“emails.models”的命名空间,这不是任何一个项目所独有的。

例如,如果一个项目名为“project1”,另一个名为“project2”,只需将“name=”参数添加到@task 装饰器:

# project1
# emails.py
@tasks(name=project1.emails.my_email_function, queue=high_priority)
def my_email_function(user_id):
    return [x of x in user_id if spam()]

# project2
# tasks.py
@tasks(name=project2.tasks.my_task_function, queue=high_priority)
def my_task_function(user_id):
    return [x of x in user_id if blahblah()]
于 2013-05-24T07:09:08.897 回答