1

我正在为 Celery 结果添加后端,我在发送任务时遇到了问题,有些被接受,有些则不被接受。

已执行和未执行的任务都显示此日志输出:

[2014-06-09 15:50:59,091: INFO/MainProcess] Received task: tasks.multithread_device_listing[e3ae6d12-ad4b-4114-9383-5802c91541f2]

那些被执行的然后显示这个输出:

[2014-06-09 15:50:59,093: DEBUG/MainProcess] Task accepted: tasks.multithread_device_listing[e3ae6d12-ad4b-4114-9383-5802c91541f2] pid:2810

虽然未执行的任务永远不会到达上述行。

我如何发送任务:

from celery import group
from time import sleep


signatures = []
signature = some_method_with_task_decorator.subtask()
signatures.append(signature)
signature = some_other_method_with_task_decorator.subtask()
signatures.append(signature)
job = group(signatures)
result = job.apply_async()
while not result.ready():
    sleep(60)

我的芹菜配置报告它是:

    software -> celery:3.1.11 (Cipater) kombu:3.0.18 py:2.7.5
                billiard:3.3.0.17 py-amqp:1.4.5
    platform -> system:Darwin arch:64bit imp:CPython
    loader   -> celery.loaders.app.AppLoader
    settings -> transport:amqp results:amqp://username:pass@localhost:5672/automated_reports

    CELERY_QUEUES:
        (<unbound Queue automated_reports -> <unbound Exchange default(direct)> -> automated_reports>,)
    CELERY_DEFAULT_ROUTING_KEY: '********'
    CELERY_INCLUDE:
        ('celery.app.builtins',
     'automated_reports.queue.tasks',
     'automated_reports.queue.subtasks')
    CELERY_IMPORTS:
        ('automated_reports.queue.tasks', 'automated_reports.queue.subtasks')
    CELERY_RESULT_PERSISTENT: True
    CELERY_ROUTES: {
        'automated_reports.queue.tasks.run_device_info_report': {   'queue': 'automated_reports'},
        'uploader.queue.subtasks.multithread_device_listing': {   'queue': 'automated_reports'},
        'uploader.queue.subtasks.multithread_individual_device': {   'queue': 'automated_reports'},
        'uploader.queue.tasks.multithread_device_listing': {   'queue': 'automated_reports'},
        'uploader.queue.tasks.multithread_individual_device': {   'queue': 'automated_reports'}}
    CELERY_DEFAULT_QUEUE: 'automated_reports'
    BROKER_URL: 'amqp://username:********@localhost:5672/automated_reports'
    CELERY_RESULT_BACKEND: 'amqp://username:pass@localhost:5672/automated_reports'

我的启动命令是:

~/Documents/Development/automated_reports/bin/celery worker --loglevel=DEBUG --autoreload -A automated_reports.queue.tasks -Q automated_reports -B --schedule=~/Documents/Development/automated_reports/log/celerybeat --autoscale=10,3

此外,当我停止 celery 时,它会将从未被接受的任务从我的队列中拉出。然后当我重新启动时,它接受它们并执行它们。

非常感谢对此行为的任何帮助。我确定这与我的后端配置有关,但我很难隔离问题或其更正。谢谢!

4

1 回答 1

1

我找到了答案。

我注意到在某些情况下,“inqueue”似乎可以正确接收任务,但在其他情况下则不然。当我搜索芹菜文档时,我发现了这个注释:http ://celery.readthedocs.org/en/latest/whatsnew-3.1.html?highlight=inqueue#caveats

我在一个长时间运行的任务中执行子任务,所以这听起来很像我看到的行为。另外,我在提到的版本上,而在以前的版本中,我在相同的配置下没有这个问题。

我添加了 -Ofair 参数来启动 worker,它立即解决了这个问题。

于 2014-06-10T14:54:11.510 回答