5

我正在探索将 Kubeflow 作为部署和连接典型 ML 管道的各种组件的选项。我使用 docker 容器作为 Kubeflow 组件,到目前为止,我一直无法成功地使用ContainerOp.file_outputs对象在组件之间传递结果。

根据我对该功能的理解,创建并保存到声明为file_outputs组件之一的文件应该会导致它持久存在并可供以下组件读取。

这就是我试图在我的管道 python 代码中声明它的方式:

import kfp.dsl as dsl 
import kfp.gcp as gcp

@dsl.pipeline(name='kubeflow demo')
def pipeline(project_id='kubeflow-demo-254012'):
    data_collector = dsl.ContainerOp(
        name='data collector', 
        image='eu.gcr.io/kubeflow-demo-254012/data-collector',
        arguments=[ "--project_id", project_id ],
        file_outputs={ "output": '/output.txt' }
    )   
    data_preprocessor = dsl.ContainerOp(
        name='data preprocessor',
        image='eu.gcr.io/kubeflow-demo-254012/data-preprocessor',
        arguments=[ "--project_id", project_id ]
    )
    data_preprocessor.after(data_collector)
    #TODO: add other components
if __name__ == '__main__':
    import kfp.compiler as compiler
    compiler.Compiler().compile(pipeline, __file__ + '.tar.gz')

data-collector.py组件的 python 代码中,我获取数据集,然后将其写入output.txt. 我能够从同一组件内的文件中读取,但不能data-preprocessor.py在我获得FileNotFoundError.

是对file_outputs基于容器的 Kubeflow 组件使用 invalid 还是我在代码中错误地使用了它?如果在我的情况下不是一个选项,是否可以在管道声明 python 代码中以编程方式创建 Kubernetes 卷并使用它们而不是file_outputs

4

2 回答 2

4

在一个 Kubeflow 管道组件中创建的文件是容器本地的。要在后续步骤中引用它,您需要将其传递为:

data_preprocessor = dsl.ContainerOp(
        name='data preprocessor',
        image='eu.gcr.io/kubeflow-demo-254012/data-preprocessor',
        arguments=["--fetched_dataset", data_collector.outputs['output'],
                   "--project_id", project_id,
                  ]

注意: data_collector.outputs['output']将包含文件的实际字符串内容/output.txt(不是文件的路径)。如果您希望它包含文件的路径,则需要将数据集写入共享存储(如 s3 或挂载的 PVC 卷)并将共享存储的路径/链接写入 /output.txt. data_preprocessor然后可以根据路径读取数据集。

于 2019-10-01T06:27:41.153 回答
3

主要分为三个步骤:

  1. 保存一个 output.txt 文件,该文件将包含您想要传递给下一个组件的数据/参数/任何内容。 注意:它应该在根级别,即 /output.txt

  2. 将 file_outputs={'output': '/output.txt'} 作为参数传递,如示例所示。

  3. 在你将在 dsl.pipeline 中写入的 container_op 传递参数(到需要从早期组件输出的组件的各个参数)作为 comp1.output (这里 comp1 是第一个组件,它产生输出并将其存储在 /output.txt 中)

import kfp
from kfp import dsl

def SendMsg(
    send_msg: str = 'akash'
):
    return dsl.ContainerOp(
        name = 'Print msg', 
        image = 'docker.io/akashdesarda/comp1:latest', 
        command = ['python', 'msg.py'],
        arguments=[
            '--msg', send_msg
        ],
        file_outputs={
            'output': '/output.txt',
        }
    )

def GetMsg(
    get_msg: str
):
    return dsl.ContainerOp(
        name = 'Read msg from 1st component',
        image = 'docker.io/akashdesarda/comp2:latest',
        command = ['python', 'msg.py'],
        arguments=[
            '--msg', get_msg
        ]
    )

@dsl.pipeline(
    name = 'Pass parameter',
    description = 'Passing para')
def  passing_parameter(send_msg):
    comp1 = SendMsg(send_msg)
    comp2 = GetMsg(comp1.output)


if __name__ == '__main__':
  import kfp.compiler as compiler
  compiler.Compiler().compile(passing_parameter, __file__ + '.tar.gz')
于 2019-12-27T10:40:12.733 回答