0

在 dagster 教程的Materializiations 部分中,我们为中间输出选择一个文件名 ( sorted_cereals_csv_path),然后将其作为物化输出:

@solid
def sort_by_calories(context, cereals):

    # Sort the data (removed for brevity)

    sorted_cereals_csv_path = os.path.abspath(
        'calories_sorted_{run_id}.csv'.format(run_id=context.run_id)
    )
    with open(sorted_cereals_csv_path, 'w') as fd:
        writer = csv.DictWriter(fd, fieldnames)
        writer.writeheader()
        writer.writerows(sorted_cereals)
    yield Materialization(
        label='sorted_cereals_csv',
        description='Cereals data frame sorted by caloric content',
        metadata_entries=[
            EventMetadataEntry.path(
                sorted_cereals_csv_path, 'sorted_cereals_csv_path'
            )
        ],
    )
    yield Output(None)

然而,这依赖于我们可以使用本地文件系统(这可能不是真的)这一事实,它可能会被以后的运行覆盖(这不是我想要的)并且它也迫使我们想出一个文件名永远不会被使用。

我想在我的大部分实体中做的只是说“这是一个文件对象,请为我存储它”,而不用关心的存储位置。我可以在不考虑所有这些事情的情况下实现文件吗?我应该tempfile为此使用python的工具吗?

4

1 回答 1

1

实际上,这似乎在output_materialization示例中得到了回答。

你基本上定义了一个类型:

@usable_as_dagster_type(
    name='LessSimpleDataFrame',
    description='A more sophisticated data frame that type checks its structure.',
    input_hydration_config=less_simple_data_frame_input_hydration_config,
    output_materialization_config=less_simple_data_frame_output_materialization_config,
)
class LessSimpleDataFrame(list):
    pass

这种类型有一个读取配置的 output_materialization 策略:

def less_simple_data_frame_output_materialization_config(
    context, config, value
):
    csv_path = os.path.abspath(config['csv']['path'])

    # Save data to this path

您在配置中指定此路径:

    execute_pipeline(
        output_materialization_pipeline,
        {
            'solids': {
                'sort_by_calories': {
                    'outputs': [
                        {'result': {'csv': {'path': 'cereal_out.csv'}}}
                    ],
                }
            }
        },
    )

您仍然必须为每个中间输出提供一个文件名,但您可以在配置中进行,每次运行可能会有所不同,而不是在管道本身中定义它。

于 2020-02-20T01:46:37.247 回答