我应该如何在管道 A 完成后启动管道 B,并将管道 A 的输出用于管道 B?
一段代码作为起点:
from dagster import InputDefinition, Nothing, OutputDefinition, pipeline, solid
@solid
def pipeline1_task1(context) -> Nothing:
context.log.info('in pipeline 1 task 1')
@solid(input_defs=[InputDefinition("start", Nothing)],
output_defs=[OutputDefinition(str, 'some_str')])
def pipeline1_task2(context) -> str:
context.log.info('in pipeline 1 task 2')
return 'my cool output'
@pipeline
def pipeline1():
pipeline1_task2(pipeline1_task1())
@solid(input_defs=[InputDefinition("print_str", str)])
def pipeline2_task1(context, print_str) -> Nothing:
context.log.info('in pipeline 2 task 1' + print_str)
@solid(input_defs=[InputDefinition("start", Nothing)])
def pipeline2_task2(context) -> Nothing:
context.log.info('in pipeline 2 task 2')
@pipeline
def pipeline2():
pipeline2_task2(pipeline2_task1())
if __name__ == '__main__':
# run pipeline 1
# store outputs
# call pipeline 2 using the above outputs
这里我们有三个管道:pipeline1
有两个实体,可能做我们想做的任何事情,并从第二个实体返回输出。pipeline2
应该使用 的输出pipeline1_task2
,最终做另一件工作并打印第一个管道的输出。
我应该如何“连接”两条管道?