9

我正在使用 Django Celery 和 Redis 来运行一些这样的任务:

header = [
    tasks.invalidate_user.subtask(args = (user)),
    tasks.invalidate_details.subtask(args = (user))
]

callback = tasks.rebuild.subtask()

chord(header)(callback)   

所以基本上与文档中所述相同。

我的问题是,当调用此任务和弦时,celery.chord_unlock任务会一直重试。中的任务header成功完成,但由于chord_unlock从未完成,callback从未被调用

猜测我的问题是无法检测到任务header是否已完成,我转向文档查看如何自定义。我找到了一个部分,描述了同步是如何实现的,提供了一个示例,我缺少的是如何调用该示例函数(即是否有此信号?)。

此外,请注意,此方法不适用于 Redis 后端:

这被除 Redis 和 Memcached 之外的所有结果后端使用,它们在标头中的每个任务之后增加一个计数器,然后在计数器超过集合中的任务数时应用回调。

但也说,Redis 方法更好:

Redis 和 Memcached 方法是更好的解决方案

那是什么方法?它是如何实施的?

那么,为什么chord_unlock从未完成,我怎样才能让它检测到已完成的header任务?

我正在使用:Django 1.4、芹菜 2.5.3、django-celery 2.5.5、redis 2.4.12

4

3 回答 3

8

您没有任务示例,但我遇到了同样的问题,我的解决方案可能适用。

我有ignore_result=True我要添加到和弦的任务,定义如下:

@task(ignore_result=True)

显然忽略结果使得 chord_unlock 任务不知道它们已经完成。在我删除了ignore_result(即使任务只返回true)之后,和弦正确地调用了回调。

于 2012-07-18T20:48:19.743 回答
0

这可能会导致这样的问题;从文档中;

笔记:

如果您在 Redis 结果后端使用和弦并且还覆盖了 Task.after_return() 方法,则需要确保调用 super 方法,否则将不会应用和弦回调。

def after_return(self, *args, **kwargs):
    do_something()
    super(MyTask, self).after_return(*args, **kwargs)

据我了解,如果您after_return的任务中有覆盖功能,则必须将其删除或至少调用超级功能。

话题底部:http ://celery.readthedocs.org/en/latest/userguide/canvas.html#important-notes

于 2014-05-26T08:40:07.940 回答
0

我遇到了同样的错误,我将代理更改为 rabbitmq 并且 chord_unlock 正在工作,直到我的任务完成(2-3 分钟的任务)

使用 redis 时,任务完成并且 chord_unlock 每 1 秒重试 8-10 次,因此回调没有正确执行。

[2012-08-24 16:31:05,804: INFO/MainProcess] Task celery.chord_unlock[5a46e8ac-de40-484f-8dc1-7cf01693df7a] retry: Retry in 1s [2012-08-24 16:31:06,817: INFO/MainProcess] Got task from broker: celery.chord_unlock[5a46e8ac-de40-484f-8dc1-7cf01693df7a] eta:[2012-08-24 16:31:07.815719-05:00]

... just like 8-10 times....

更改代理对我有用,现在我正在测试@Chris 解决方案,我的回调函数从不接收来自标题子任务的结果:S,因此,它对我不起作用。


芹菜==3.0.6

django==1.4

django-celery==3.0.6

redis==2.6

经纪人:Mac OS X 上的 redis-2.4.16

于 2012-08-24T21:32:03.883 回答