2

我已经阅读了很多与此类似的帖子,但对我来说似乎没有任何意义。

我正在尝试将 Celery PeriodicTask 配置为每 5 秒触发一次,但我被 Celery 配置问题挂断了(我认为)

通讯/tasks.py

import datetime
from celery.decorators import periodic_task

@periodic_task
def send_queued_messages():
    # do something...

我的应用程序/settings.py

...
from comm.tasks import send_queued_messages
from datetime import timedelta
CELERYBEAT_SCHEDULE = {
    'send_queued_messages_every_5_seconds': {
        'task': 'comm.tasks.send_queued_messages',   # Is the issue here?  I've tried a dozen variations!!
        'schedule': timedelta(seconds=5),
        },
    }

我的错误日志的相关输出:

23:41:00 worker.1 | [2015-06-10 03:41:00,657: ERROR/MainProcess] Received unregistered task of type 'send_queued_messages'.
23:41:00 worker.1 | The message has been ignored and discarded.
23:41:00 worker.1 | 
23:41:00 worker.1 | Did you remember to import the module containing this task?
23:41:00 worker.1 | Or maybe you are using relative imports?
23:41:00 worker.1 | Please see http://bit.ly/gLye1c for more information.
23:41:00 worker.1 | 
23:41:00 worker.1 | The full contents of the message body was:
23:41:00 worker.1 | {'utc': True, 'chord': None, 'args': [], 'retries': 0, 'expires': None, 'task': 'send_queued_messages', 'callbacks': None, 'errbacks': None, 'timelimit': (None, None), 'taskset': None, 'kwargs': {}, 'eta': None, 'id': 'a8ca18...227a56'} (216b)
4

5 回答 5

11

我遇到了这个确切的问题,事实证明问题不在于任务的名称,而是 Celery 工作人员不知道您的任务模块。

换句话说,你有正确的任务名称('comm.tasks.send_queued_messages'),它是由任务装饰器生成的,你只是没有告诉 Celery 在哪里寻找它。

最快的解决方案是将以下内容添加到myapp/settings.py

CELERY_IMPORTS = ['comm.tasks']

根据文档,这确定了“工作人员启动时要导入的模块序列”。

或者,您可以将设置配置为自动发现任务(请参阅此处的文档),但是您必须将任务模块命名comm/tasks.pycomm/comm/tasks.py.

对我来说,困惑来自 Celery 的自动命名约定,它看起来像一个 import 语句,这让我相信我是在用它CELERYBEAT_SCHEDULE['task']来告诉 Celery在哪里寻找任务。相反,调度程序只是将名称作为字符串。

于 2016-06-12T20:40:25.710 回答
2

如果您在proj/proj/celery.py以下celery 文档中使用以下代码,

# Load task modules from all registered Django app configs.
app.autodiscover_tasks()

和教程看起来很完美,

不要忘记将您的新应用添加INSTALLED_APPSsettings.py

于 2017-05-11T13:12:34.903 回答
2

Celery-beat 向我抛出了同样的问题,上面写的任何内容都对我没有帮助。

我试图将此片段插入到我的设置文件中:

CELERY_IMPORTS = ['myapp.tasks']

虽然我不需要它。但这对我有帮助,因为出现了新的错误日志。在我的任务文件中,我导入了不存在的应用程序。

这可能是由非常混乱的原因引起的……很多时候有语法错误或类似的东西。

于 2017-12-08T11:40:36.443 回答
2

我只是遇到了同样的问题。我做错的是我没有正确终止我的芹菜任务之一(比如在你的情况下名为' send_queued_messages '的那个),让它在后台运行并继续发送名为'send_queued_messages'的任务。这似乎不是一个大问题,只要您的代码中仍有该任务。

但就我而言,在那之后我将任务名称修改为不同的名称(例如' comm.tasks.send_queued_messages ')。我认为这使得名为“ send_queued_messages ”的任务变成了“未注册任务”。

以下是我为解决的问题:

  1. 杀死所有芹菜进程:(如芹菜文件所示,grep 'celery' 而不是 'celery worker')

ps辅助| grep '芹菜' | awk '{打印 $2}' | xargs 杀死 -9

  1. 重新启动rabbitmq:(或您使用的任何代理)

sudo -u rabbitmq rabbitmqctl stop
sudo rabbitmq-server

然后错误消失了。

于 2016-10-13T16:37:17.260 回答
1

有关任务命名的说明,请参阅celery 文档

在这种情况下,您需要为 celerybeat 提供一个它可以找到的任务名称。

尝试这个:

CELERYBEAT_SCHEDULE = {
    'send_queued_messages_every_5_seconds': {
        'task': 'myapp.tasks.send_queued_messages', 
        'schedule': timedelta(seconds=5),
        },
    }
于 2015-06-10T17:07:57.807 回答