2

我想在 Kubeflow 管道中编写一个具有 2 个组件的管道:A 和 B

A 的输出是图像路径列表。

我想为每个图像路径运行一个 docker 图像 (B)

从我看到的dsl.ContainerOpB 可以等待 A 的输出,但我不知道如何创建 B 的多个实例

4

1 回答 1

2

更新:这最近发生了变化,可以简单地通过使用ParallerlFor输出来完成。参考:https ://stackoverflow.com/a/59292863/4438213

----- 低于 KF 0.6 及之前 ----

这是 Kubeflow DSL 的一个公认问题:使用一个组件 (A) 的输出并对其进行迭代,为前一个输出中的每个条目运行一个新组件 (B)。这很难,因为 Kubeflow 使用的 DSL 是在编译时,而且不可能知道当时输出中有多少元素。

参考:

从 KF v0.6 开始支持的唯一动态(运行时)迭代形式是:dsl-recursion。我已经通过 2 种方式使其工作,缺少上述问题的待处理工作:

如果 A 的结果的大小在每次运行中都是一个常数并且是预先知道的,那么这很简单。

案例 A:上一步的输出大小已知

  1. 创建一个轻量级组合以获取给定索引处的图像路径
# Write a python code to extract the path from
# the string of refs the previous step returns 
def get_path(str_of_paths: str, idx: int) -> str:
    return str_of_paths.split(" ")[idx] # or some other delimiter
  1. 将 python 代码包装在Kubeflow 轻量级组件中
get_img_path_comp = comp.func_to_container_op(get_path,base_image='tensorflow/tensorflow') # or any appropriate base image

然后管道 dsl 代码中的常规 for 循环将起作用

image_path_res = ContainerOP_A() # run your container Op
for idx in range(4):
    path = get_path(image_path_res.output, i)
    ContainerOp_B(path.output)

案例 B:当上一步的输出不是固定大小时

这有点棘手和复杂。从 KF v0.6 开始,Kubeflow 允许的唯一动态循环形式是dsl-recursion

选项1

  1. 创建 2 个轻量级组件,一个用于计算结果的大小,sizer_op然后get_img_path_comp从上面重复使用相同的组件。
@dsl.component
def sizer_op(str_of_refs) -> int:
    return len(str_of_refs.split("|"))
sizer_op_comp = comp.func_to_container_op(sizer_op,base_image='tensorflow/tensorflow')

然后你可以运行recusive函数

@dsl.component
def subtracter_op(cur_idx) -> int:
    return cur_idx - 1
sub_op_comp = comp.func_to_container_op(subtracter_op,base_image='tensorflow/tensorflow')

@dsl.graph_component
def recursive_run(list_of_images, cur_ref):
    with dsl.Condition(cur_ref >= 0):
        path = get_path(image_path_res.output, i)
        ContainerOp_B(path.output)

        # call recursively
        next_ref = sub_op_comp(cur_ref)
        recursive_run(list_of_images, next_ref)


image_path_res = ContainerOP_A() # run your container Op
sizer = sizer_op_comp(image_path_res)
recursive_run(image_path_res.output, sizer.output)

选项 2

运行 ContainerOp_A 后,创建一个 Kubeflow 组件,该组件从 ContainerOp_A 读取结果,在 python 代码中解析结果,然后使用 kfclient 生成仅运行 Containerop_B 的新运行。您可以使用以下方式连接到 KF 管道客户端:

kf_client = Client(host=localhost:9990)

参考:kf_client

于 2019-10-14T20:39:49.183 回答