0

我使用 python 函数制作了两个组件,并尝试使用文件在它们之间传递数据,但我无法这样做。我想计算总和,然后使用文件将答案发送给其他组件。下面是部分代码(代码在没有文件传递的情况下工作)。请协助。

# Define your components code as standalone python functions:======================
    def add(a: float, b: float, f: comp.OutputTextFile(float)) -> NamedTuple(
        'AddOutput',
        [
            ('out', comp.OutputTextFile(float))
        ]):
        '''Calculates sum of two arguments'''
        sum = a+b

        f.write(sum)

        from collections import namedtuple

        addOutput = namedtuple(
            'AddOutput',
            ['out'])
        return addOutput(f)  # the metrics will be uploaded to the cloud


    def multiply(c:float, d:float, f: comp.InputTextFile(float) ):
        '''Calculates the product'''
        product = c * d

        print(f.read())


add_op = comp.func_to_container_op(add, output_component_file='add_component.yaml')
    product_op = comp.create_component_from_func(multiply, 
output_component_file='multiple_component.yaml')


@dsl.pipeline(
      name='Addition-pipeline',
      description='An example pipeline that performs addition calculations.'
    )
    def my_pipeline(a, b='7', c='4', d='1'):

        add_op = pl_comp_list[0]
        product_op = pl_comp_list[1]

        first_add_task = add_op(a, 4)
        second_add_task = product_op(c, d, first_add_task.outputs['out'])
4

2 回答 2

2

这是我测试过的管道的一个稍微简化的版本,它可以工作。OutputTextFile传递给和的类类型无关紧要InputTextFile。它将被读取和写入为str. 所以这是你应该改变的:

  • 写信时OutputTextFilesum_float to str
  • 读取时InputTextFilef.read()str to float
import kfp
from kfp import dsl
from kfp import components as comp


def add(a: float, b: float, f: comp.OutputTextFile()):
    '''Calculates sum of two arguments'''
    sum_ = a + b
    f.write(str(sum_)) # cast to str
    return sum_


def multiply(c: float, d: float, f: comp.InputTextFile()):
    '''Calculates the product'''
    in_ = float(f.read()) # cast to float
    product = c * d * in_
    print(product)
    return product


add_op = comp.func_to_container_op(add,
                                   output_component_file='add_component.yaml')
product_op = comp.create_component_from_func(
    multiply, output_component_file='multiple_component.yaml')


@dsl.pipeline(
    name='Addition-pipeline',
    description='An example pipeline that performs addition calculations.')
def my_pipeline(a, b='7', c='4', d='1'):

    first_add_task = add_op(a, b)
    second_add_task = product_op(c, d, first_add_task.output)


if __name__ == "__main__":
    compiled_name = __file__ + ".yaml"
    kfp.compiler.Compiler().compile(my_pipeline, compiled_name)
于 2021-01-22T03:21:48.533 回答
0

('out', comp.OutputTextFile(float))

这不是真的有效。OutputTextFile注解(和其他类似注解)只能在​​函数参数中使用。函数返回值仅适用于要作为值(而不是文件)输出的输出。

由于您已经有了f: comp.OutputTextFile(float)可以完全删除函数返回值的位置。然后将f输出传递给下游组件:product_op(c, d, first_add_task.outputs['f']).

于 2021-07-19T07:27:22.223 回答