所以我有两个任务需要并行调用,然后如果它们成功/失败我需要做一些事情,我还需要将它们的结果用于另一个任务。我将在这里试一试白板,以使其更加清晰:
| |
| |
add subtract
/ \ / \
/ \ / \
success \ / success
or fail \ / or fail
task \ / task
\ /
\ /
\ /
email results
我认为和弦可能是最好的选择,因为我可以并行执行加法和减法,然后获取它们的结果并执行另一个回调。
这是我的任务的代码:
@shared_task
def add(x):
print 'adding'
return x + 5
@shared_task
def subtract(x):
print 'subtracting'
return x - 5
@shared_task(ignore_result=True)
def add_subtract_task_success(result):
if result == 10:
print 'task_success'
@shared_task
def add_subtract_task_failure(parent_task_id):
print 'link_error failure'
@shared_task
def email_results(dict):
print 'This is to be emailed: ' + str(dict)
所以在 django shell 中玩耍,我尝试了以下方法:
>>> callback = email_results.s()
>>> header = [add.s(5).apply_async(link=add_subtract_task_success.s(), link_error=add_subtract_task_failure.s()), subtract.s(5).apply_async(link=add_subtract_task_success.s(), link_error=add_subtract_task_failure.s())]
>>>
>>> result = chord(header)(callback)
这是它产生的异常:
Traceback (most recent call last):
File "<console>", line 1, in <module>
File "/opt/rh/python27/root/usr/lib/python2.7/site-packages/celery-3.1.17-py2.7.egg/celery/canvas.py", line 636, in __call__
return self.apply_async((), {'body': body} if body else {}, **options)
File "/opt/rh/python27/root/usr/lib/python2.7/site-packages/celery-3.1.17-py2.7.egg/celery/canvas.py", line 631, in apply_async
parent = _chord(self.tasks, body, args, **options)
File "/opt/rh/python27/root/usr/lib/python2.7/site-packages/celery-3.1.17-py2.7.egg/celery/app/task.py", line 420, in __call__
return self.run(*args, **kwargs)
File "/opt/rh/python27/root/usr/lib/python2.7/site-packages/celery-3.1.17-py2.7.egg/celery/app/builtins.py", line 329, in run
maybe_signature(s, app=app).clone() for s in tasks
AttributeError: 'AsyncResult' object has no attribute 'clone'
欢迎任何想法和建议,非常感谢。