1

我的真实情况是,我想从 api 调用中获取活动列表,并为每个活动触发一系列函数。一旦所有的链都完成了,我需要调用一个函数来报告结果。

我试图尽可能简化这一点并拥有以下代码。这会运行,但在链完成之前调用和弦解锁函数。在此代码中,这意味着它无法对结果数组求和。

import time

from celery import Celery, chain, chord, group

app = Celery('tasks', broker='amqp://guest@localhost//', backend='amqp')


@app.task
def generate():
    return [1, 2, 3, 4, 5]


@app.task
def dmap(it, first, second):
    chains = []
    for arg in it:
        c = chain(first.clone([arg, ]), second)
        chains.append(c)

    return group(chains)()


@app.task
def add(x, y):
    print 'add {x} {y}'.format(x=x, y=y)
    time.sleep(3)
    return x + y


@app.task
def mul(x, y):
    print 'mul {x} {y}'.format(x=x, y=y)
    time.sleep(2)
    return x * y


@app.task
def xsum(numbers):

    print numbers
    to_sum = []
    for x in numbers[0]:
        to_sum.append(x.result)
    print to_sum

    return sum(to_sum)

if __name__ == '__main__':

    x = add.s(0)
    y = mul.s(1)

    workers = generate.si() | dmap.s(x, y)

    result = chord(workers)(xsum.s())
    print result.get()

dmap功能基于此答案。我也看过这个答案。最后一个链接暗示我想做的事情可能是不可能的,因为“没有什么可以同步的,因为组是并行发生的”。

generate当函数返回一个数组而不是单个项目时,我无法弄清楚如何使解决方案工作。

运行上面的日志显示(早期?)和弦解锁,因此xsum尝试对 3 为 的结果数组求和None

[2014-11-11 14:03:10,308: INFO/MainProcess] Received task: tasks.generate[2eedc847-ff67-4e0c-90e1-48314133bb51]
[2014-11-11 14:03:10,311: INFO/MainProcess] Received task: celery.chord_unlock[7d07e506-1aae-40e5-bd05-bbc53b286103] eta:[2014-11-11 14:03:11.307477+00:00]
[2014-11-11 14:03:10,338: INFO/MainProcess] Received task: tasks.dmap[0f2efa72-402d-412e-807e-bbf191850c18]
[2014-11-11 14:03:10,365: INFO/MainProcess] Task tasks.generate[2eedc847-ff67-4e0c-90e1-48314133bb51] succeeded in 0.0523488249746s: [1, 2, 3, 4, 5]
[2014-11-11 14:03:10,386: INFO/MainProcess] Received task: tasks.add[eccf5faa-069c-4634-826e-af5793a11c68]
[2014-11-11 14:03:10,388: WARNING/Worker-2] add 1 0
[2014-11-11 14:03:10,390: INFO/MainProcess] Received task: tasks.add[6b66167b-2767-4bde-a0a0-32f5fab7a961]
[2014-11-11 14:03:10,392: WARNING/Worker-1] add 2 0
[2014-11-11 14:03:10,394: INFO/MainProcess] Received task: tasks.add[d74659b0-b512-44f9-88b4-1908f79bfc52]
[2014-11-11 14:03:10,397: INFO/MainProcess] Received task: tasks.add[e9b3336f-9b37-4f25-81a7-cbac819da38c]
[2014-11-11 14:03:10,398: INFO/MainProcess] Received task: tasks.add[63b2ce22-1288-4cac-9018-8ddefaab575d]
[2014-11-11 14:03:10,399: WARNING/Worker-4] add 3 0
[2014-11-11 14:03:10,401: INFO/MainProcess] Task tasks.dmap[0f2efa72-402d-412e-807e-bbf191850c18] succeeded in 0.061700456019s: <GroupResult: 9a3972ff-0976-46d2-937f-9ea4a1ead56b [925ec9c3-09da-43c1-9b94-c04dbe67f195,...
[2014-11-11 14:03:10,402: WARNING/Worker-3] add 4 0
[2014-11-11 14:03:13,409: INFO/MainProcess] Received task: tasks.mul[f696aa0a-844f-4e81-9722-0693c6e8c344]
[2014-11-11 14:03:13,410: INFO/MainProcess] Received task: tasks.mul[538c3c60-67f8-409d-b4ce-bf09184aa03b]
[2014-11-11 14:03:13,418: INFO/MainProcess] Received task: tasks.mul[4ffb6d04-0cf2-4300-a0de-bf53acf6662d]
[2014-11-11 14:03:13,419: INFO/MainProcess] Received task: tasks.mul[925ec9c3-09da-43c1-9b94-c04dbe67f195]
[2014-11-11 14:03:13,436: INFO/MainProcess] Task tasks.add[d74659b0-b512-44f9-88b4-1908f79bfc52] succeeded in 3.03667491797s: 3
[2014-11-11 14:03:13,437: INFO/MainProcess] Task tasks.add[e9b3336f-9b37-4f25-81a7-cbac819da38c] succeeded in 3.03460178198s: 4
[2014-11-11 14:03:13,438: INFO/MainProcess] Task tasks.add[6b66167b-2767-4bde-a0a0-32f5fab7a961] succeeded in 3.04608612298s: 2
[2014-11-11 14:03:13,439: WARNING/Worker-4] mul 4 1
[2014-11-11 14:03:13,450: WARNING/Worker-2] add 5 0
[2014-11-11 14:03:13,452: INFO/MainProcess] Task tasks.add[eccf5faa-069c-4634-826e-af5793a11c68] succeeded in 3.06420573901s: 1
[2014-11-11 14:03:13,454: WARNING/Worker-3] mul 3 1
[2014-11-11 14:03:13,481: INFO/MainProcess] Task celery.chord_unlock[7d07e506-1aae-40e5-bd05-bbc53b286103] succeeded in 0.0413383140112s: None
[2014-11-11 14:03:13,485: INFO/MainProcess] Received task: tasks.xsum[575f5375-bf0f-4d41-b9a3-57661eaf4373]
[2014-11-11 14:03:15,470: INFO/MainProcess] Task tasks.mul[f696aa0a-844f-4e81-9722-0693c6e8c344] succeeded in 2.031282346s: 4
[2014-11-11 14:03:15,472: WARNING/Worker-1] mul 1 1
[2014-11-11 14:03:15,477: INFO/MainProcess] Task tasks.mul[538c3c60-67f8-409d-b4ce-bf09184aa03b] succeeded in 2.02354899806s: 3
[2014-11-11 14:03:15,479: WARNING/Worker-4] [<GroupResult: 9a3972ff-0976-46d2-937f-9ea4a1ead56b [925ec9c3-09da-43c1-9b94-c04dbe67f195, 4ffb6d04-0cf2-4300-a0de-bf53acf6662d, 538c3c60-67f8-409d-b4ce-bf09184aa03b, f696aa0a-844f-4e81-9722-0693c6e8c344, 82a6b814-53a5-45f1-a0dc-43885f92eca4]>]
[2014-11-11 14:03:15,555: WARNING/Worker-4] [None, None, 3, 4, None]
[2014-11-11 14:03:15,564: ERROR/MainProcess] Task tasks.xsum[575f5375-bf0f-4d41-b9a3-57661eaf4373] raised unexpected: TypeError("unsupported operand type(s) for +: 'int' and 'NoneType'",)
Traceback (most recent call last):
  File "/home/duncan/VEnvs/adwords/local/lib/python2.7/site-packages/celery/app/trace.py", line 240, in trace_task
    R = retval = fun(*args, **kwargs)
  File "/home/duncan/VEnvs/adwords/local/lib/python2.7/site-packages/celery/app/trace.py", line 437, in __protected_call__
    return self.run(*args, **kwargs)
  File "/home/duncan/projects/celerychordtest/tasks.py", line 47, in xsum
    return sum(to_sum)
TypeError: unsupported operand type(s) for +: 'int' and 'NoneType'
[2014-11-11 14:03:16,460: INFO/MainProcess] Received task: tasks.mul[82a6b814-53a5-45f1-a0dc-43885f92eca4]
[2014-11-11 14:03:16,462: WARNING/Worker-3] mul 5 1
[2014-11-11 14:03:16,476: WARNING/Worker-2] mul 2 1
[2014-11-11 14:03:16,476: INFO/MainProcess] Task tasks.add[63b2ce22-1288-4cac-9018-8ddefaab575d] succeeded in 3.02716274199s: 5
[2014-11-11 14:03:17,480: INFO/MainProcess] Task tasks.mul[925ec9c3-09da-43c1-9b94-c04dbe67f195] succeeded in 2.00813938997s: 1
[2014-11-11 14:03:18,485: INFO/MainProcess] Task tasks.mul[4ffb6d04-0cf2-4300-a0de-bf53acf6662d] succeeded in 2.00837794197s: 2
[2014-11-11 14:03:18,471: INFO/MainProcess] Task tasks.mul[82a6b814-53a5-45f1-a0dc-43885f92eca4] succeeded in 2.009012155s: 5

我希望/期望该过程等到每个链链完成后再调用和弦解锁。

4

2 回答 2

2

Like @ChillarAnand suggested I ended up redesigning my tasks, however I did so to eliminate the need for a chord. I wanted the ability to have a group of chains, which meant I couldn't (as far as I could work out) combine this with a chord.

What I now do is to trigger the "final" task as part of triggering the group of chains. In order to make this work the final task has to check that the other tasks have completed. Since I know my last task (in my real world program) writes to a database I can simply check that I have a row in the database for each item that was generated.

For anyone facing a similar problem, the relevant parts of the final function look roughly like the following:

class NotReady(Exception):
    pass

@shared_task(default_retry_delay=30, max_retries=10)
def output(generated_list):

    list_from_db = query db ...
    try:
        raise_if_not_equal(list_from_db, generated_list)
    except NotReady, exc:
        raise current.retry(exc=exc, countdown=30)

    ... everything is ready do stuff ...

FWIW: I'll probably update the retry to backoff basing the code roughly on the following thread

This feels like a good answer, and crucially because this task throws an exception I never have a worker sat polling to find out if everything has completed.

于 2014-11-13T17:14:10.943 回答
1

和弦是仅在组中的所有任务都执行完后才执行的任务 。

如果你有这样的简单和弦

>>> callback = tsum.s()
>>> header = [add.s(i, i) for i in range(100)]
>>> result = chord(header)(callback)
>>> result.get()

它首先执行标头中的任务组并将异步对象存储在列表中。然后当它调用回调时,它会遍历列表并从 aync 对象中获取任务的结果。

在您的情况下,您将workers作为标题传递。workers是一个管道(或一个单一的大任务),当它被执行时它只给出一个异步对象而不是一个对象列表。因此,一旦 xsum 收到此消息,它就会尝试遍历异步对象字典并尝试执行不同类型对象的求和。所以,它正在抛出错误。

TypeError: unsupported operand type(s) for +: 'int' and 'NoneType'

因此,您必须重新设计您的任务,以便仅将一组任务作为标题提供给和弦。

于 2014-11-11T15:33:30.353 回答