4

假设我创建了一个具有以下实体的 Dagster 管道:

  1. 从文件执行 SQL 查询并获取结果
  2. 将结果写入表

我想同时为 10 个不同的表执行此操作。每个表都需要不同的 SQL 查询。最好的方法是什么?

4

1 回答 1

6

一种方法是使用实​​体工厂。run_query_solid_factory并且write_results_solid_factory是实体工厂,它们接受输入(例如名称和查询或表)并返回可以在管道中运行的实体。summary_report在打印出摘要信息之前等待所有上游实体完成。

def run_query_solid_factory(name, query):
    @solid(name=name)
    def _run_query(context):
        context.log.info(query)
        return 'result'

    return _run_query

def write_results_solid_factory(name, table):
    @solid(name=name)
    def _write_results(context, query_result):
        context.log.info(table)
        context.log.info(query_result)
        return 'success'

    return _write_results

@solid
def summary_report(context, statuses):
    context.log.info(' '.join(statuses))

@pipeline
def pipeline():
    solid_output_handles = []
    queries = [('table', 'query'), ('table2', 'query2')]
    for table, query in queries:
        get_data = run_query_solid_factory('run_query_{}'.format(query), query)
        write_results = write_results_solid_factory('write_results_to_table_{}'.format(table), table)
        solid_output_handles.append(write_results(get_data()))

    summary_report(solid_output_handles)

达格结构 Dag 执行日志

上一个答案:

我建议创建一个composite_solid,它由一个处理(1) 的实体和一个处理(2) 的实体组成。然后,您可以为 10 个表中的每一个表命名一次composite_solid ,这样您就可以通过 config 传入 SQL 查询(请参阅教程

于 2020-05-01T22:11:28.740 回答