0

我正在构建一个原型管道,它做两件事:

  1. (实心)从现有目录中清除文件
  2. (实心)运行批处理以将数据转储到该目录中。

步骤#1 都是副作用,没有输出传递给#2。是否可以在管道中表达这两种实体之间的依赖关系?

4

1 回答 1

2

我认为https://docs.dagster.io/examples/nothing中的以下代码片段应该适用于您的用例:

from dagster import Nothing

@solid
def create_table_1(_) -> Nothing:
    get_database_connection().execute("create table_1 as select * from some_source_table")


@solid(input_defs=[InputDefinition("start", Nothing)])
def create_table_2(_):
    get_database_connection().execute("create table_2 as select * from table_1")


@pipeline
def my_pipeline():
    create_table_2(create_table_1())
于 2021-01-07T22:15:01.337 回答