1

我应该如何在管道 A 完成后启动管道 B,并将管道 A 的输出用于管道 B?

一段代码作为起点:

from dagster import InputDefinition, Nothing, OutputDefinition, pipeline, solid

@solid
def pipeline1_task1(context) -> Nothing:
    context.log.info('in pipeline 1 task 1')


@solid(input_defs=[InputDefinition("start", Nothing)],
       output_defs=[OutputDefinition(str, 'some_str')])
def pipeline1_task2(context) -> str:
    context.log.info('in pipeline 1 task 2')
    return 'my cool output'


@pipeline
def pipeline1():
    pipeline1_task2(pipeline1_task1())


@solid(input_defs=[InputDefinition("print_str", str)])
def pipeline2_task1(context, print_str) -> Nothing:
    context.log.info('in pipeline 2 task 1' + print_str)


@solid(input_defs=[InputDefinition("start", Nothing)])
def pipeline2_task2(context) -> Nothing:
    context.log.info('in pipeline 2 task 2')


@pipeline
def pipeline2():
    pipeline2_task2(pipeline2_task1())


if __name__ == '__main__':
    # run pipeline 1
    # store outputs
    # call pipeline 2 using the above outputs

这里我们有三个管道:pipeline1有两个实体,可能做我们想做的任何事情,并从第二个实体返回输出。pipeline2应该使用 的输出pipeline1_task2,最终做另一件工作并打印第一个管道的输出。

我应该如何“连接”两条管道?

4

2 回答 2

1

使一个管道在另一个管道之后执行的一种方法是通过传感器。在 Dagster 中推荐的方法是使用“资产传感器”。第一个管道中的一个实体产生一个AssetMaterialization,第二个管道中的传感器等待该资产被物化。

这是一个示例:https ://docs.dagster.io/concepts/partitions-schedules-sensors/sensors#asset-sensors

于 2021-03-19T00:25:35.880 回答
0

在玩了一会儿之后,我得出了以下解决方案(在我看来不是太优雅,但至少它有效):

from dagster import (InputDefinition, OutputDefinition,
                     execute_pipeline, pipeline, solid, Nothing, repository)


@solid
def pipeline1_task1(context) -> Nothing:
    context.log.info('in pipeline 1 task 1')


@solid(input_defs=[InputDefinition("start", Nothing)],
output_defs=[OutputDefinition(str, 'some_str')])
def pipeline1_task2(context) -> str:
    context.log.info('in pipeline 1 task 2')
    return '\n\n\nmy cool output\n\n\n'


@pipeline
def pipeline1():
    pipeline1_task2(pipeline1_task1())


@solid(input_defs=[InputDefinition("print_str", str)])
def pipeline2_task1(context, print_str) -> Nothing:
    context.log.info('in pipeline 2 task 1' + print_str)


@solid(input_defs=[InputDefinition("start", Nothing)])
def pipeline2_task2(context) -> Nothing:
    context.log.info('in pipeline 2 task 2')


@pipeline
def pipeline2():
    pipeline2_task2(pipeline2_task1())


@solid
def run_pipelines(context):
    pout = execute_pipeline(pipeline1)
    some_str = pout.result_for_solid('pipeline1_task2')
    conf = {'solids': {'pipeline2_task1': {'inputs': {'print_str': some_str.output_value('some_str')}}}}
    execute_pipeline(pipeline2, run_config=conf)

@pipeline
def pipeline3():
    run_pipelines()


@repository
def repo():
    return [pipeline1, pipeline2, pipeline3]

if __name__ == '__main__':
    execute_pipeline(pipeline3)

所以......在这里我已经定义pipeline3而不是在底部条件中做所有事情。管道 3 只有一个实体,它执行pipeline1并获取实体的输出pipeline1_task2。然后它创建一个包含该输出的配置,some_str并将此配置传递给execute_pipeline第二个管道。

在这里,我们还定义了一个@repository函数,它是 Dagster 确定所有三个管道是一个整体的一部分所必需的。

整个事情在dagit. 尽管每个管道都与其他管道分开显示,但三个管道显示在一个存储库中(如代码中所定义)。 在此处输入图像描述

于 2021-03-17T11:59:34.423 回答