0

在 Dagster 中创建 PartitionSetDefinition 时,您可以传入一个“模式”,该模式将交换所使用的资源(出于测试目的,您可能希望在 PROD 中使用云存储,但使用本地存储进行本地开发

模式要求您指定通常在环境 yaml 文件中提供的一组配置值,但是当您创建如下所示的 PartitionSetDefinition 时,您只能传递模式。这通常通过在管道上设置预设并将其用于运行来完成,但 PartitionSetDefinition 仅允许设置模式而不是预设。

date_partition_set = PartitionSetDefinition(
    name="date_partition_set",
    pipeline_name="my_pipeline",
    partition_fn=get_date_partitions,
    run_config_fn_for_partition=run_config_for_date_partition,
    mode="test"
)

您如何为此提供必要的预设/环境值?

4

1 回答 1

0

我发现这样做的一种方法是在使用 Dagster 提供的一些实用程序为每个分区创建运行配置时将预设加载到运行配置中。在他们的一些单元测试中发现了这一点:

test_base.yaml 具有对应于测试模式的典型预设配置。

from dagster.utils import file_relative_path, load_yaml_from_globs

def run_config_for_date_partition(partition):
    date = partition.value
    config_path = file_relative_path(__file__, os.path.join("../my_pkg/environments/", relative_path))
    config_dict = load_yaml_from_globs(
        config_path("test_base.yaml"),
    )
    table_name = "table1"
    input_config = {"config": {"start_date": date, "table_name": table_name}}
    config_dict["solids"] = {
        "download_snow_incremental_table": {**input_config}
    }
    return config_dict

date_partition_set = PartitionSetDefinition(
    name="date_partition_set",
    pipeline_name="my_pipeline",
    partition_fn=get_date_partitions,
    run_config_fn_for_partition=run_config_for_date_partition,
    mode="test"
)
于 2021-03-09T14:16:22.030 回答