0

我正在尝试设置一个简单的 KubeFlow 管道,但我无法以适用于 KubeFlow 的方式打包依赖项。

该代码只是下载一个配置文件并对其进行解析,然后将解析后的配置传回。

但是,为了解析配置文件,它需要访问另一个内部 python 包。

我有一个.tar.gz托管在同一个项目的存储桶上的包的存档,并将包的 URL 添加为依赖项,但我收到一条错误消息说tarfile.ReadError: not a gzip file.

我知道该文件很好,因此在存储桶上托管或 kubeflow 安装依赖项的方式是一些中间问题。

这是一个最小的例子:

from kfp import compiler
from kfp import dsl
from kfp.components import func_to_container_op
from google.protobuf import text_format
from google.cloud import storage
import training_reader

def get_training_config(working_bucket: str,
                        working_directoy: str,
                        config_file: str) -> training_reader.TrainEvalPipelineConfig:
    download_file(working_bucket, os.path.join(working_directoy, config_file), "ssd.config")
    pipeline_config = training_reader.TrainEvalPipelineConfig()
    with open("ssd.config", 'r') as f:
        text_format.Merge(f.read(), pipeline_config)
    return pipeline_config

config_op_packages = ["https://storage.cloud.google.com/my_bucket/packages/training-reader-0.1.tar.gz",
                      "google-cloud-storage",
                      "protobuf"
                      ]
training_config_op = func_to_container_op(get_training_config,
                                          base_image="tensorflow/tensorflow:1.15.2-py3",
                                          packages_to_install=config_op_packages)

def output_config(config: training_reader.TrainEvalPipelineConfig) -> None:
    print(config)

output_config_op = func_to_container_op(output_config)

@dsl.pipeline(
    name='Post Training Processing',
    description='Building the post-processing pipeline'
)
def ssd_postprocessing_pipeline(
    working_bucket: str,
    working_directory: str,
    config_file:str):
    config = training_config_op(working_bucket, working_directory, config_file)
    output_config_op(config.output)

pipeline_name = ssd_postprocessing_pipeline.__name__ + '.zip'
compiler.Compiler().compile(ssd_postprocessing_pipeline, pipeline_name)
4

1 回答 1

1

https://storage.cloud.google.com/my_bucket/packages/training-reader-0.1.tar.gzIRL 需要身份验证。尝试以隐身模式下载它,您将看到登录页面而不是文件。将 URL 更改为https://storage.googleapis.com/my_bucket/packages/training-reader-0.1.tar.gz适用于公共对象,但您的对象不是公共的。

您唯一能做的(如果您不能公开软件包)是使用google.cloud.storage库或gsutil程序从存储桶中下载文件,然后手动安装它subprocess.run([sys.executable, '-m', 'pip', 'install', ...])

你从哪里下载数据?

目的是什么

    pipeline_config = training_reader.TrainEvalPipelineConfig()
    with open("ssd.config", 'r') as f:
        text_format.Merge(f.read(), pipeline_config)
    return pipeline_config

为什么不直接执行以下操作:


def get_training_config(
    working_bucket: str,
    working_directory: str,
    config_file: str,
    output_config_path: OutputFile('TrainEvalPipelineConfig'),
):
    download_file(working_bucket, os.path.join(working_directoy, config_file), output_config_path)

kubeflow 安装依赖项的方式。

将您的组件导出到可加载component.yaml,您将看到 KFP 轻量级组件如何安装依赖项:

training_config_op = func_to_container_op(
    get_training_config,
    base_image="tensorflow/tensorflow:1.15.2-py3",
    packages_to_install=config_op_packages,
    output_component_file='component.yaml',
)

PS一些小信息:

@dsl.pipeline(

除非您想使用dsl-compile命令行程序,否则不需要

pipeline_name = ssd_postprocessing_pipeline。名称+ '.zip' compiler.Compiler().compile(ssd_postprocessing_pipeline, pipeline_name)

您知道您可以kfp.Client(host=...).create_run_from_pipeline_func(ssd_postprocessing_pipeline, arguments={})立即运行管道吗?

于 2020-06-19T23:55:02.823 回答