我正在构建一个从多个模式中提取数据的数据管道。我正在尝试使用 jinja 来获取模式名称列表,然后在将数据组合到单个临时表之前,在所有这些模式中查询表中的特定列(具有相同的名称)。
在概念层面上,我知道我需要使用 Jinja 创建两个列表:一个包含schema
我想要循环的名称的列表,以及一个我想要在所有模式中存在的表中查询的列名列表我在查询。让我们调用模式名称schema_names
列表和列名称列表column_names
。
{% set var schema_names=('brand1', 'brand2', 'brand3') %}
{% set var column_names=('col1', 'col2', 'col3') %}
从理论上讲,下一步似乎是我应该通过 Jinja 将这些值传递到一个查询中,该查询从schema_vars
. 如果我手动编写 SQL 查询,我会使用几个WITH
语句,然后使用 a 组合UNION ALL
:
/* Query that creates a variable with value `brand1` while querying `brand1` schema */
WITH schema1_data
AS (
SELECT 'brand1' AS schema_name,
col1,
col2,
col3
FROM brand1.table_name
),
/* Query that creates a variable with value `brand2` while querying `brand2` schema */
schema2_data
AS (
SELECT 'brand2' AS schema_name,
col1,
col2,
col3
FROM brand2.table_name
),
/* Query that creates a variable with value `brand3` while querying `brand3` schema */
schema3_data
AS (
SELECT 'brand3' AS schema_name,
col1,
col2,
col3
FROM brand3.table_name
),
/* Union statement combining identical tables from 3 schemas */
combined_schemas
AS (
SELECT *
FROM schema1_data
UNION ALL
(
SELECT *
FROM schema2_data)
UNION ALL
(
SELECT *
FROM schema3_data)
)
SELECT *
FROM combined_schemas
关于如何在 Jinja/dbt 中最好地实现这一点的想法?也对问题的替代框架持开放态度,这些框架采用不同的方法将来自不同模式中相同表的信息组合到单个暂存表中,以由数据管道处理。