3

我需要创建一个可以使用此工作流处理任务的批处理:

                                             | task 4
                                  | task 3 ->| task 4
                       | task 2 ->           | task 4
                           
                                  | task 3 ->| task 4
    input ->  task 1 ->
                       | task 2 -> ... 
  • 任务 #1 处理输入数据并返回列表列表。
  • 任务 #2 从任务 #1 接收列表,并返回列表列表。
  • 任务 #3 从任务 #2 接收列表,并返回列表列表。
  • 任务#4 从任务#4 接收列表并处理列表中的数据。

例如,任务 #1 返回[[],[],[],[]]. 这意味着流程必须并行运行 4 个任务 #2。每个任务 #2 返回[[],[],[]]. 现在我们必须有 4x3 任务 #3。然后任务 #3 返回[[],[]]。最后流程必须运行 4x3x2 任务 #4。

可以使用 Prefect Flow 吗?我尝试使用映射功能,但它似乎不支持如此复杂的工作流模式(或者我可能没有正确使用它)。

with Flow('test') as flow:
   res1 = task1()
   res2 = task2.map(res1)
   res3 = task3.map(res2)
   res4 = task4.map(res3)

当我运行流程 task1 返回正确数量的列表。然后 flow 创建了 4 个 task2,每个 task2 返回三个列表的列表。但是,流程并没有创建 12 个 task3,而是只创建了 4 个。每个 task3 接收 4 个列表的列表,因为它是使用 task1 而不是来自 task2 的 1 个列表创建的。

关于如何创建这样的工作流程的任何想法?

4

0 回答 0