6

我使用 celery.chord(...) 创建了一组任务和一个在组中的所有任务完成后调用的方法。

我使用 amqp 结果后端(但我想切换到 memcached)。

我的工人每秒一遍又一遍地打印这条线。我不知道如何打破这个无限循环。我可以访问rabbitMQ Web 界面,但找不到ID 为“32ba5fe4-...”的东西。

[2013-03-22 14:18:26,896: INFO/MainProcess] Task celery.chord_unlock[32ba5fe4-918c-480f-8a78-a310c11d0c3a] retry: Retry in 1s
[2013-03-22 14:18:26,897: INFO/MainProcess] Got task from broker: celery.chord_unlock[32ba5fe4-918c-480f-8a78-a310c11d0c3a] eta:[2013-03-22 13:18:27.895123+00:00]

这是一个测试环境。没有数据可以丢失。

我使用芹菜 3.0.16

4

3 回答 3

4

它不应该是一个无限循环。

celery.chord_unlock 任务检查和弦子任务是否已完成调用合并回调任务。如果不是,它会安排自己在一秒钟内再次检查。和弦任务完成后,您将不再在日志中看到这些消息。

编辑:您可以撤销 chord_unlock 任务以停止循环

celery.control.revoke('32ba5fe4-918c-480f-8a78-a310c11d0c3a')
于 2013-03-22T17:01:13.453 回答
3

为了进行完整性检查,我max_retries在 worker 启动时设置了 via 一个信号:

from celery.signals import worker_init

@worker_init.connect
def limit_chord_unlock_tasks(worker, **kwargs):
    """
    Set max_retries for chord.unlock tasks to avoid infinitely looping
    tasks. (see celery/celery#1700 or celery/celery#2725)
    """
    task = worker.app.tasks['celery.chord_unlock']
    if task.max_retries is None:
        retries = getattr(worker.app.conf, 'CHORD_UNLOCK_MAX_RETRIES', None)
        task.max_retries = retries

CHORD_UNLOCK_MAX_RETRIES然后在我的 Celery 配置中添加一个变量。

于 2017-07-21T17:22:39.957 回答
1

我遇到过同样的问题。为了停止循环,我安装了flower,然后从Web 界面的Tasks菜单中撤消了该任务。Revoke按钮位于单击任务的 UUID 后出现的任务详细信息页面中。

于 2016-06-02T23:55:15.133 回答