考虑这个例子 - 你需要从源数据库加载 table1,做一些通用的转换(比如转换时间戳列的时区)并将结果数据写入雪花。这是一个简单的,可以使用 3 dagster ops 来实现。
现在,假设您需要做同样的事情,但需要使用 100 张桌子。你会怎么用 dagster 做呢?你真的需要创建 100 个工作/图表吗?或者你可以创建一个作业,将执行 100 次?你能限制多少这些工作将同时运行吗?
考虑这个例子 - 你需要从源数据库加载 table1,做一些通用的转换(比如转换时间戳列的时区)并将结果数据写入雪花。这是一个简单的,可以使用 3 dagster ops 来实现。
现在,假设您需要做同样的事情,但需要使用 100 张桌子。你会怎么用 dagster 做呢?你真的需要创建 100 个工作/图表吗?或者你可以创建一个作业,将执行 100 次?你能限制多少这些工作将同时运行吗?
执行此操作有两个主要选项:
使用此设置,您的所有 ETL 将在一个作业中发生。您将拥有一个初始操作,该操作将为您想要为其执行此过程的每个表名称生成一个 DynamicOutput,并将其提供给一组操作(可能组织成一个图表),这些操作将在每个单独的 DynamicOutput 上运行。
根据您使用的执行器,可以限制整体步骤并发(例如,默认的multiprocess_executor支持此选项)。
from dagster import job, op, graph
import pandas as pd
@op(config_schema={"table_name": str})
def extract_table(context) -> pd.DataFrame:
table_name = context.op_config["table_name"]
# do some load...
return pd.DataFrame()
@op
def transform_table(table: pd.DataFrame) -> pd.DataFrame:
# do some transform...
return table
@op(config_schema={"table_name": str})
def load_table(context, table: pd.DataFrame):
table_name = context.op_config["table_name"]
# load to snowflake...
@job
def configurable_etl():
load_table(transform_table(extract_table()))
# this is what the configuration would look like to extract from table
# src_foo and load into table dest_foo
configurable_etl.execute_in_process(
run_config={
"ops": {
"extract_table": {"config": {"table_name": "src_foo"}},
"load_table": {"config": {"table_name": "dest_foo"}},
}
}
)
在这里,您通过为相关操作提供配置模式来创建可以指向源表和目标表的作业。根据这些配置选项(通过运行配置创建运行时提供),您的作业将在不同的源/目标表上运行。
该示例显示使用 python API 显式运行此作业,但如果您从 Dagit 运行它,您还可以在此处输入此配置的 yaml 版本。如果您想简化配置模式(因为它非常嵌套,如图所示),您始终可以创建一个配置映射以使界面更好:)
从这里,您可以通过为您的作业提供唯一标签来限制运行并发,并使用QueuedRunCoordinator来限制该标签的最大并发运行数。