我是 Celery 的新手,我正在尝试了解它是否可以解决我的问题。
我需要启动一些任务 ( An
),然后在B
完成这些任务后运行另一个任务 ( )。问题是任务An
是按顺序添加的,我不想在开始第一个之前等待最后一个添加。我可以将任务配置B
为在任务完成后执行An
吗?
现在到真实场景:
- 任务
An
- 处理用户上传的文件(每个文件上传后添加) - 任务
B
- 对处理所有上传文件的结果做一些事情
也欢迎替代解决方案
我是 Celery 的新手,我正在尝试了解它是否可以解决我的问题。
我需要启动一些任务 ( An
),然后在B
完成这些任务后运行另一个任务 ( )。问题是任务An
是按顺序添加的,我不想在开始第一个之前等待最后一个添加。我可以将任务配置B
为在任务完成后执行An
吗?
现在到真实场景:
An
- 处理用户上传的文件(每个文件上传后添加)B
- 对处理所有上传文件的结果做一些事情也欢迎替代解决方案
当然你可以做到这一点,芹菜画布支持许多选项,包括你需要的行为,在一组任务之后运行一个任务......它被称为“和弦”,例如:
from celery import chord
from tasks import task_upload1, task_upload2, task_upload3, final_execution
result = chord(task_upload1.s(), task_upload2.s(), task_upload3.s())(final_execution.s())
get_required_result = result.get()
您可以参考此链接了解更多详情
使用 RabbitMQ,您可以使用消息确认和聚合器模式获得准确的行为。
您启动工作人员,它使用消息(A
)并做一些工作(在您的情况下处理用户上传的文件),但完成后不会发送ack
。相反,它需要下一个消息表单队列,如果它A
再次成为任务,他正在做同样的事情。在某个时候,他将收到任务B
并可以处理所有以前A
的结果,所有赎罪并发ack
送给所有人。
不幸的是,这种情况不能用 Celery 完成,因为你必须在创建时间时指定所有 A
任务和最终B
任务(链、弦、回调等)。
或者,您可以将Task.id
每个成功A
的任务保存在单独的队列(不是 Celery 队列)中,并在执行B
任务时处理此消息。Celery 适合这种算法。