使用 Celery 构建一系列顺序任务的最佳方法是什么?我的代码有一堆独立的任务(所以它们都可以是不可变的签名),但是如果其中一个任务引发异常,我想停止序列。
我一直在寻找解决方案,但我被困住了。我们使用的是 Celery 3.1.12 + RabbitMQ。
起初,我们使用一个和弦来说明标头任务成功,以便回调发生。它工作得很好,除了我们需要在标题中添加更多任务。
所以我试着在和弦上做一个链条。这也有效,但是弦挂在 PENDING 上,因为当子任务引发异常时链不会退出。
一个人为的例子:
@celery.task
def bite(food):
if food == 'salad':
raise TypeError('Throwing up. I hate {}'.format(food))
print "bite {}...".format(food)
return True
@celery.task
def chew(food):
print "chewing {}...".format(food)
return True
@celery.task
def swallow(food):
print "swallowing {}...".format(food)
return True
@celery.task
def chain_in_chord(food):
return chord(
chain(
bite.si(food), chew.si(food)
),
swallow.si(food)
).delay()
如果 food=salad,bite子任务会抛出异常。并且链的其余部分不会发生 - 这就是我想要的。但是整个和弦都卡在了 PENDING 状态,因为链卡在 PENDING 并且不会退出。
>>> res = foo.chain_in_chord('salad')
>>> res.status
'PENDING'
所以我需要:
- 找出一种中止链并在链失败时重新引发异常的方法
- 或者,想办法在和弦的标题中指定多个子任务(我似乎做不到)。
在线搜索,chain 显然表现如预期 - 所以你必须遍历 asyncResult 的每个父状态。我更喜欢一种机制,让整个事情中止并重新引发异常/跟踪......就像和弦一样,但可以选择添加多个子任务。
对于任何反馈,我们都表示感谢。谢谢。