我需要创建一个可以使用此工作流处理任务的批处理:
| task 4
| task 3 ->| task 4
| task 2 -> | task 4
| task 3 ->| task 4
input -> task 1 ->
| task 2 -> ...
- 任务 #1 处理输入数据并返回列表列表。
- 任务 #2 从任务 #1 接收列表,并返回列表列表。
- 任务 #3 从任务 #2 接收列表,并返回列表列表。
- 任务#4 从任务#4 接收列表并处理列表中的数据。
例如,任务 #1 返回[[],[],[],[]]
. 这意味着流程必须并行运行 4 个任务 #2。每个任务 #2 返回[[],[],[]]
. 现在我们必须有 4x3 任务 #3。然后任务 #3 返回[[],[]]
。最后流程必须运行 4x3x2 任务 #4。
可以使用 Prefect Flow 吗?我尝试使用映射功能,但它似乎不支持如此复杂的工作流模式(或者我可能没有正确使用它)。
with Flow('test') as flow:
res1 = task1()
res2 = task2.map(res1)
res3 = task3.map(res2)
res4 = task4.map(res3)
当我运行流程 task1 返回正确数量的列表。然后 flow 创建了 4 个 task2,每个 task2 返回三个列表的列表。但是,流程并没有创建 12 个 task3,而是只创建了 4 个。每个 task3 接收 4 个列表的列表,因为它是使用 task1 而不是来自 task2 的 1 个列表创建的。
关于如何创建这样的工作流程的任何想法?