3

我有一个程序可以通过 Python 的 itertools 生成大量产品列表;基本上是一大组各种单词组合。通过 products() 函数运行多个集合。我想做的是让 Celery 帮助解决这个问题,将每组分配给不同的 Celery 任务,然后最后将它们组合起来。我的理解是和弦是做到这一点的方法。

所以基本上我有这个:

callback = tabulate_results.subtask()
header = []
for combo in combos_to_run:
    header.append(run_product.subtask(args=(object_terms, combo)))
result = chord(header)(callback)
result.get()

以及它的两个支持性、精简的功能:

from celery import subtask, chord
@task()
def run_product(object_list, combo_set):
    results = []
    for result in product(object_list, *combo_set):
        results.append(result)
    return results

@task()
def tabulate_results(result_sets):
    master_set = []
    for result_set in result_sets:
        master_set.extend(result_set)

    return master_set

起初,和弦任务出现在 celeryev 中,但我在这里引用了一个问题:Django Celery - Missing something but I don't know what? 有结果,但无法让他们知道 Celery 返回与通过 MySQL 进行结果跟踪有关的错误。我确实使用 MySQL 作为我的结果后端,并将其切换到 Redis 摆脱了它。但是,现在我遇到了一个新问题。当我通过 Django shell 运行代码时,celeryev 中没有出现任何任务,除此之外没有返回任何内容:

R IS: <GroupResult: 9f658e8d-591f-4fa9-9e79-4db0c51e8331 [9b199d1e-061f-413c-9521-4a3051dd121a, 2effbfb5-c9dc-4569-a63f-656c233a9387, 80911a60-6a22-46bb-83a1-d5a84c659794, 70acfa43-8ffe-4bc8-8ff1-1df6def035e1, dd417423-d1f6-44eb-8c4b-2ded40d7614f, fbff8adc-815d-459c-b914-b30528dbbd39]>

基本上是一条 Celery 消息,但没有数据。代码也永远不会返回,我的光标悬空。当我按 C 键退出时,行号是 Celery 中似乎正在等待的东西。我不知道会发生什么,因为 celeryev 没有给我任何任务。我已经确认其他任务确实显示在 celeryev 中。

我已经正常测试了我的函数,没有将它们作为 Celery 任务运行,并且它们正常返回。

简短版本 我试图让 Celery 和弦帮助我的 Django 应用程序中的密集 Python 任务,但它们似乎没有返回任何结果或进入 celeryev。Redis 后端。芹菜版本 3.0.15。姜戈 1.4。

4

1 回答 1

2

首先,让一个任务等待另一个任务的结果通常被认为是不好的做法(这就是为什么应该使用链式子任务)——来源

除此之外,您为和弦粘贴的语法不正确。这是相关链接: http ://docs.celeryproject.org/en/master/getting-started/next-steps.html#chords 。除其他问题外,您的代码不会将任何任务传递给和弦。和弦只是一组作为一个组处理的任务,之后触发回调。从文档中,这是和弦的正确语法:

>>> from celery import chord
>>> from proj.tasks import add, xsum

>>> chord((add.s(i, i) for i in xrange(10)), xsum.s())().get()
90

请注意,和弦有两个参数,一方面是初始任务组,另一方面是回调(用逗号分隔)。

这是一个开始,但听起来花更多时间在文档上会比任何事情都更有帮助。

于 2013-03-18T22:02:37.003 回答