我正在尝试设置一个简单的 KubeFlow 管道,但我无法以适用于 KubeFlow 的方式打包依赖项。
该代码只是下载一个配置文件并对其进行解析,然后将解析后的配置传回。
但是,为了解析配置文件,它需要访问另一个内部 python 包。
我有一个.tar.gz
托管在同一个项目的存储桶上的包的存档,并将包的 URL 添加为依赖项,但我收到一条错误消息说tarfile.ReadError: not a gzip file
.
我知道该文件很好,因此在存储桶上托管或 kubeflow 安装依赖项的方式是一些中间问题。
这是一个最小的例子:
from kfp import compiler
from kfp import dsl
from kfp.components import func_to_container_op
from google.protobuf import text_format
from google.cloud import storage
import training_reader
def get_training_config(working_bucket: str,
working_directoy: str,
config_file: str) -> training_reader.TrainEvalPipelineConfig:
download_file(working_bucket, os.path.join(working_directoy, config_file), "ssd.config")
pipeline_config = training_reader.TrainEvalPipelineConfig()
with open("ssd.config", 'r') as f:
text_format.Merge(f.read(), pipeline_config)
return pipeline_config
config_op_packages = ["https://storage.cloud.google.com/my_bucket/packages/training-reader-0.1.tar.gz",
"google-cloud-storage",
"protobuf"
]
training_config_op = func_to_container_op(get_training_config,
base_image="tensorflow/tensorflow:1.15.2-py3",
packages_to_install=config_op_packages)
def output_config(config: training_reader.TrainEvalPipelineConfig) -> None:
print(config)
output_config_op = func_to_container_op(output_config)
@dsl.pipeline(
name='Post Training Processing',
description='Building the post-processing pipeline'
)
def ssd_postprocessing_pipeline(
working_bucket: str,
working_directory: str,
config_file:str):
config = training_config_op(working_bucket, working_directory, config_file)
output_config_op(config.output)
pipeline_name = ssd_postprocessing_pipeline.__name__ + '.zip'
compiler.Compiler().compile(ssd_postprocessing_pipeline, pipeline_name)