1

我正在构建一个从多个模式中提取数据的数据管道。我正在尝试使用 jinja 来获取模式名称列表,然后在将数据组合到单个临时表之前,在所有这些模式中查询表中的特定列(具有相同的名称)。

在概念层面上,我知道我需要使用 Jinja 创建两个列表:一个包含schema我想要循环的名称的列表,以及一个我想要在所有模式中存在的表中查询的列名列表我在查询。让我们调用模式名称schema_names列表和列名称列表column_names

  • {% set var schema_names=('brand1', 'brand2', 'brand3') %}
  • {% set var column_names=('col1', 'col2', 'col3') %}

从理论上讲,下一步似乎是我应该通过 Jinja 将这些值传递到一个查询中,该查询从schema_vars. 如果我手动编写 SQL 查询,我会使用几个WITH语句,然后使用 a 组合UNION ALL

/* Query that creates a variable with value `brand1` while querying `brand1` schema */
WITH schema1_data
       AS (
            SELECT 'brand1' AS schema_name,
                   col1,
                   col2,
                   col3
            FROM brand1.table_name
  ),

/* Query that creates a variable with value `brand2` while querying `brand2` schema */
     schema2_data
       AS (
            SELECT 'brand2' AS schema_name,
                   col1,
                   col2,
                   col3
            FROM brand2.table_name
     ),

  /* Query that creates a variable with value `brand3` while querying `brand3` schema */
     schema3_data
       AS (
            SELECT 'brand3' AS schema_name,
                   col1,
                   col2,
                   col3
            FROM brand3.table_name
     ),
  /* Union statement combining identical tables from 3 schemas */
     combined_schemas
       AS (
            SELECT *
            FROM schema1_data
            UNION ALL
            (
              SELECT *
              FROM schema2_data)
            UNION ALL
            (
              SELECT *
              FROM schema3_data)
     )

SELECT *
FROM combined_schemas

关于如何在 Jinja/dbt 中最好地实现这一点的想法?也对问题的替代框架持开放态度,这些框架采用不同的方法将来自不同模式中相同表的信息组合到单个暂存表中,以由数据管道处理。

4

1 回答 1

1

您的 UNION ALL 方法听起来不错。您可以稍微简化您的计划:如果您想要的每个模型的列相同,那么您不需要该column_names变量。此外,仅当您手动编写时,使用 CTE 才有帮助,但使用 jinja 模板,它实际上会让您的生活更加艰难,所以不要强迫自己使用它们。

你会需要:

  • 此处记录的jinjafor 循环。
  • loop.last在 for 循环块中访问的特殊变量

我还没有测试过以下代码,但我会这样写:

{% set var schema_names=('brand1', 'brand2', 'brand3') %}
{% for schema in schema_names %}
(
  select
    '{{ schema }}' as schema_name,
    col1,
    col2,
    col3
  from {{ schema }}.table_name
)
{% if not loop.last %}
union all
{% endif %}
{% endfor %}

如果您不想自己编写代码,dbt_utils是否已经实现了此功能:union_relations.

虽然,您可以参考它们的实现(比您需要的更复杂,因为它们处理更多极端情况),您可以在此处找到它们会很有趣。

于 2020-11-18T10:10:20.187 回答