我目前正在将 Redis 用于包含几个步骤的工作流。对于每个步骤,工作人员从队列中抓取有效负载,完成后将其推送到下一个队列,下一个工作人员可以将其进一步处理。如果发生异常,工作人员会将任务放入一个特殊的队列中。
因此,关于通过应用程序的流程的应用程序逻辑在于工作人员本身。我现在想改用芹菜。
我知道在 Celery 中您可以使用子任务,但我看不到您如何表达针对不同条件(例如异常和超时)的特定错误处理。你应该使用不同的队列还是使用子任务,在代码中会是什么样子?
我目前正在将 Redis 用于包含几个步骤的工作流。对于每个步骤,工作人员从队列中抓取有效负载,完成后将其推送到下一个队列,下一个工作人员可以将其进一步处理。如果发生异常,工作人员会将任务放入一个特殊的队列中。
因此,关于通过应用程序的流程的应用程序逻辑在于工作人员本身。我现在想改用芹菜。
我知道在 Celery 中您可以使用子任务,但我看不到您如何表达针对不同条件(例如异常和超时)的特定错误处理。你应该使用不同的队列还是使用子任务,在代码中会是什么样子?
我现在已经更彻底地阅读了这些文档,并且还做了一些测试,这很有效:
问题是将任务串联在一起,以便它们一个接一个地发生,但同时能够处理错误情况并“中断”流程并做其他事情,而不仅仅是中止。
您可以使用link将任务串在一起,如果其中有一个额外的参数 *link_error* ,它将用于失败。从阅读:
http://docs.celeryproject.org/en/latest/userguide/calling.html#linking-callbacks-errbacks 我做了这个:
res = add.apply_async((2, 2), link=mul.s(16), link_error=onerror.s())
这三个任务是 add、mul 和 onerror。Add 将两个数字相加, mul 将两个数字相乘。所以这会将 2 和 2 相加,然后总和将结转到下一步 (mul) 并乘以 16。但是,如果添加代码有错误,或者有错误的数据,或者如果发生其他错误但可检测到的错误, add 会抛出异常,并且会运行 onerror 任务而不是 mul。onerror 任务获取作业的 uuid,并且可以在数据库后端中查找作业(如果配置了这样的作业)。然后 onerror 任务可以归档作业或发送电子邮件或其他任何内容。