我也遇到了动态“扇出”然后使用 Kubeflow 管道“扇入”的问题。也许有点笨拙,但我使用了一个安装的 PVC 声明来克服这个问题。
Kubeflow 允许您使用VolumeOp
(链接此处)安装已知的 PVC 或动态创建新的 PVC。此片段显示如何使用已知的 PVC。
pvc_name = '<available-pvc-name>'
pvc_volume_name = '<pvc-uuid>' # pass the pvc uuid here
# Op 1 creates a list to iterate over
op_1 = dsl.ContainerOp(
name='echo',
image='library/bash:4.4.23',
command=['sh', '-c'],
arguments=['echo "[1,2,3]"> /tmp/output.txt'],
file_outputs={'output': '/tmp/output.txt'})
# Using withParam here to iterate over the results from op1
# and writing the results of each step to its own PVC
with dsl.ParallelFor(op_1.output) as item:
op_2 = dsl.ContainerOp(
name='iterate',
image='library/bash:4.4.23',
command=['sh', '-c'],
arguments=[f"echo item-{item} > /tmp/output.txt; " # <- write to output
f"mkdir -p /mnt/{{workflow.uid}}; " # <- make a dir under /mnt
f"echo item-{item}\n >> /mnt/{{workflow.uid}}"], # <- append results from each step to the PVC
file_outputs={'output': '/tmp/output.txt'},
# mount the PVC
pvolumes={"/mnt": dsl.PipelineVolume(pvc=pvc_name, name=pvc_volume_name)})
op_3 = dsl.ContainerOp(
name='echo',
image='library/bash:4.4.23',
command=['sh', '-c'],
arguments=[f"echo /mnt/{{workflow.uid}} > /tmp/output.txt"],
# mount the PVC again to use
pvolumes={"/mnt": dsl.PipelineVolume(pvc=pvc_name, name=pvc_volume_name)},
file_outputs={'output': '/tmp/output_2.txt'}).after(op_2)
确保在最后使用op_3
循环之后运行。op_2
after(op_2)
注意:这可能是一种严厉的方法,如果 KFP 允许将其作为 KF 编译器的一部分,可能会有更好的解决方案,但我无法让它工作。如果在环境中创建 PVC 很容易,这可能适用于您的情况。