我正在构建一个原型管道,它做两件事:
- (实心)从现有目录中清除文件
- (实心)运行批处理以将数据转储到该目录中。
步骤#1 都是副作用,没有输出传递给#2。是否可以在管道中表达这两种实体之间的依赖关系?
我正在构建一个原型管道,它做两件事:
步骤#1 都是副作用,没有输出传递给#2。是否可以在管道中表达这两种实体之间的依赖关系?
我认为https://docs.dagster.io/examples/nothing中的以下代码片段应该适用于您的用例:
from dagster import Nothing
@solid
def create_table_1(_) -> Nothing:
get_database_connection().execute("create table_1 as select * from some_source_table")
@solid(input_defs=[InputDefinition("start", Nothing)])
def create_table_2(_):
get_database_connection().execute("create table_2 as select * from table_1")
@pipeline
def my_pipeline():
create_table_2(create_table_1())