6

我在动态列表上使用 ParallelFor。我想从循环中收集所有输出,并将它们传递给另一个 ContainerOp。
像下面这样的东西,显然不起作用,因为outputs列表是静态的。

with dsl.ParallelFor(op1.output) as item:
    op2 = dsl.ContainerOp(
      name='op2',
      ...
      file_outputs={
         'outputs': '/outputs.json',
    })
    outputs.append(op2.output)


op3 = dsl.ContainerOp(
   name='op3',
   ...
   arguments=['--input': outputs]  # won't work
)
4

4 回答 4

3

我也遇到了动态“扇出”然后使用 Kubeflow 管道“扇入”的问题。也许有点笨拙,但我使用了一个安装的 PVC 声明来克服这个问题。

Kubeflow 允许您使用VolumeOp(链接此处)安装已知的 PVC 或动态创建新的 PVC。此片段显示如何使用已知的 PVC。

    pvc_name = '<available-pvc-name>' 
    pvc_volume_name = '<pvc-uuid>' # pass the pvc uuid here

    # Op 1 creates a list to iterate over
    op_1 = dsl.ContainerOp(
            name='echo',
            image='library/bash:4.4.23',
            command=['sh', '-c'],
            arguments=['echo "[1,2,3]"> /tmp/output.txt'],
            file_outputs={'output': '/tmp/output.txt'})

    # Using withParam here to iterate over the results from op1
    # and writing the results of each step to its own PVC
    with dsl.ParallelFor(op_1.output) as item:
        op_2 = dsl.ContainerOp(
            name='iterate',
            image='library/bash:4.4.23',
            command=['sh', '-c'],
            arguments=[f"echo item-{item} > /tmp/output.txt; "  # <- write to output  
                       f"mkdir -p /mnt/{{workflow.uid}}; "  # <- make a dir under /mnt
                       f"echo item-{item}\n >> /mnt/{{workflow.uid}}"],  # <- append results from each step to the PVC
            file_outputs={'output': '/tmp/output.txt'},
            # mount the PVC
            pvolumes={"/mnt": dsl.PipelineVolume(pvc=pvc_name, name=pvc_volume_name)})

    op_3 = dsl.ContainerOp(
            name='echo',
            image='library/bash:4.4.23',
            command=['sh', '-c'],
            arguments=[f"echo /mnt/{{workflow.uid}} > /tmp/output.txt"],
            # mount the PVC again to use
            pvolumes={"/mnt": dsl.PipelineVolume(pvc=pvc_name, name=pvc_volume_name)},
            file_outputs={'output': '/tmp/output_2.txt'}).after(op_2)

确保在最后使用op_3循环之后运行。op_2after(op_2)

注意:这可能是一种严厉的方法,如果 KFP 允许将其作为 KF 编译器的一部分,可能会有更好的解决方案,但我无法让它工作。如果在环境中创建 PVC 很容易,这可能适用于您的情况。

于 2019-12-29T01:12:30.017 回答
2

不幸的是,Ark-kun 的解决方案对我不起作用。但是如果我们事先知道输入的数量,有一种简单的方法可以实现扇入工作流。我们可以像这样预先计算管道 DAG:

@kfp.components.create_component_from_func
def my_transformer_op(item: str) -> str:
    return item + "_NEW"


@kfp.components.create_component_from_func
def my_aggregator_op(items: list) -> str:
    return "HELLO"


def pipeline(array_of_arguments):
    @dsl.pipeline(PIPELINE_NAME, PIPELINE_DESCRIPTION)
    def dynamic_pipeline():
        outputs = []
        for i in array_of_arguments:
            outputs.append(my_transformer_op(str(i)).output)
        my_aggregator_op(outputs)
    return dynamic_pipeline

...

    run_id = client.create_run_from_pipeline_func(
        pipeline(data_samples_chunks), {},
        run_name=PIPELINE_RUN,
        experiment_name=PIPELINE_EXPERIMENT).run_id

管道图

于 2020-08-02T17:24:51.547 回答
1

这仅在您事先知道输入/输出时才有效,因此它不是真正动态的。但它确实解决了我当前的用例。

实际上,我发现以这种方式管理管道非常困难,我建议您查看 Ploomber https://github.com/ploomber/ploomber/?ref=stacko

设置管道和依赖项非常容易,并且它与大多数提供者(气流、argo 等)集成。我知道他们目前正在开发 Kubeflow 连接器(类似于 Kale,但更简单)。无论如何,它确实让我的生活更轻松。

于 2021-12-26T20:32:39.760 回答
0

问题是op3没有正确引用输出op2作为输入参数。尝试这个:

op3 = dsl.ContainerOp(
    ...
    arguments=['--input': op2.outputs['outputs']]
)
于 2019-12-25T00:15:28.060 回答