3

我想通过传递任何类型的数据来在组件之间建立管道连接,只是为了让它看起来像带箭头的流程图一样有组织。现在它就像下面在此处输入图像描述

无论 docker 容器是否生成输出,我都希望在组件之间传递一些示例数据。但是,如果需要对 docker 容器代码或 .yaml 进行任何更改,请告诉我

KFP 代码

import os
from pathlib import Path
import requests

import kfp

#Load the component
component1 = kfp.components.load_component_from_file('comp_typed.yaml')
component2 = kfp.components.load_component_from_file('component2.yaml')
component3 = kfp.components.load_component_from_file('component3.yaml')
component4 = kfp.components.load_component_from_file('component4.yaml')

#Use the component as part of the pipeline
@kfp.dsl.pipeline(name='Document Processing Pipeline', description='Document Processing Pipeline')
def data_passing():
    task1 = component1()
    task2 = component2(task1.output)
    task3 = component3(task2.output)
    task4 = component4(task3.output)

comp_typed.yaml 代码

name: DPC
description: This is an example
implementation:
  container:
    image: gcr.io/pro1-in-us/dpc_comp1@sha256:3768383b9cd694936ef00464cb1bdc7f48bc4e9bbf08bde50ac7346f25be15de
    command: [python3, /dpc_comp1.py,]

组件2.yaml

name: Custom_Plugin_1
description: This is an example
implementation:
  container:
    image: gcr.io/pro1-in-us/plugin1@sha256:16cb4aa9edf59bdf138177d41d46fcb493f84ce798781125dc7777ff5e1602e3
    command: [python3, /plugin1.py,]

我尝试了这个这个,但除了错误之外什么都没有。我是 python 和 kubeflow 的新手。我应该对使用 KFP SDK 在所有 4 个组件之间传递数据进行哪些代码更改。数据可以是文件/字符串

假设,组件 1 从 gs 存储桶下载一个 .pdf 文件,我可以将相同的文件提供给下一个下游组件吗?组件 1 将文件下载到组件 1 docker 容器的“/tmp/doc_pages”位置,我认为该容器是该特定容器的本地文件,并且下游组件无法读取它们?

4

3 回答 3

3

这个 notebook描述了如何在 KFP 组件之间传递数据,可能会有用。它包含“小数据”的概念,直接传递;与您写入文件的“大数据”相比,然后(如示例笔记本中所示)输入和输出文件的路径由系统选择并传递给函数(作为字符串)。

如果您不需要在步骤之间传递数据,但想要指定步骤排序依赖项(例如op2,直到op1完成才运行),您可以在管道定义中指出这一点,如下所示:

op2.after(op1)
于 2020-07-07T15:27:01.787 回答
0

如果您不想通过输出使用依赖关系或在组件之间传递任何数据,则可以参考上一步中的 PVC 来显式调用依赖关系。

示例:您可以创建一个 PVC 来存储数据。

vop = dsl.VolumeOp(name="pvc",
                   resource_name="pvc", size=<size>, 
                   modes=dsl.VOLUME_MODE_RWO,)

在组件中使用它:

download = dsl.ContainerOp(name="download",image="", 
                           command=[" "], arguments=[" "], 
                           pvolumes={"/data": vop.volume},)

现在您可以调用下载和训练之间的依赖关系,如下所示:

train = dsl.ContainerOp(name="train",image="", 
                        command=[" "], arguments=[" "], 
                        pvolumes={"/data": download.pvolumes["/data"]},)
于 2020-07-13T20:09:00.517 回答
0

除了艾米的出色回答:

您的管道是正确的。建立组件之间依赖关系的最好方法是建立数据依赖关系

让我们看看您的管道代码:

task2 = component2(task1.output)

您正在传递task1to的输出component2。这应该会产生您想要的依赖关系。但是有几个问题(如果您尝试编译,您的管道将显示编译错误):

  1. component1需要有输出
  2. component2需要有输入
  3. component2需要有一个输出(以便您可以将其传递给 component3)

等等。

让我们添加它们:

name: DPC
description: This is an example
outputs:
- name: output_1
implementation:
  container:
    image: gcr.io/pro1-in-us/dpc_comp1@sha256:3768383b9cd694936ef00464cb1bdc7f48bc4e9bbf08bde50ac7346f25be15de
    command: [python3, /dpc_comp1.py, --output-1-path, {outputPath: output_1}]
name: Custom_Plugin_1
description: This is an example
inputs:
- name: input_1
outputs:
- name: output_1
implementation:
  container:
    image: gcr.io/pro1-in-us/plugin1@sha256:16cb4aa9edf59bdf138177d41d46fcb493f84ce798781125dc7777ff5e1602e3
    command: [python3, /plugin1.py, --input-1-path, {inputPath: input_1}, --output-1-path, {outputPath: output_1}]

通过这些更改,您的管道应该编译并显示您想要的依赖项。

请查看有关从命令行程序创建组件的教程。

于 2020-07-11T05:49:32.983 回答