3

首先,我是 dbt 的支持者!我喜欢这个工具和它的多功能性。

在阅读一些文档时,我注意到每次调用宏时我都可以在我的模式上做一些元工作。

其中之一是清理模式。

这已根据 dbt slack 中的讨论进行了编辑

  1. dbt run-operation freeze这将自省所有将使用 dbt run 但使用自动生成的哈希(可能只是时间戳)写入的表。它会以我选择的模式输出这些表,并将“哈希”记录到控制台。

  2. dbt run-operation unfreeze --args '{hash: my_hash}'然后将继续查找使用该哈希前缀编写的表并将它们从架构中清除。

4

1 回答 1

11

我在旧版本的 dbt 中创建了这样的宏,它仍然适用于 0.17.1。

下面的宏从一个单独的宏(也在下面)item_in_list_query中获取列表。然后在内部连接该表列表以组成所需的 SQL 查询并执行它。为了演示,还使用了一个模型。tablesget_tablesitem_in_list_queryitem_in_list_query

item_in_list_query

{% macro item_in_list_query() %}

    {% set tables = get_tables() %}

    {{ log("Tables: " ~ tables, True) }}

    {% set query %}
        select id
        from my_tables
        {% if tables -%}
            where lower(table_name) in {% for t in tables -%} {{ t }} {%- endfor -%}
        {%- endif -%}
    {% endset %}

    {{ log("query: " ~ query, True) }}

    {# run_query returns agate.Table (https://agate.readthedocs.io/en/1.6.1/api/table.html). #}
    {% set results = run_query(query) %}

    {{ log("results: " ~ results, True) }}

    {# execute is a Jinja variable that returns True when dbt is in "execute" mode i.e. True when running dbt run but False during dbt compile. #}
    {% if execute %}
    {# agate.table.rows is agate.MappedSequence in which data that can be accessed either by numeric index or by key. #}
    {% set results_list = results.rows %}
    {% else %}
    {% set results_list = [] %}
    {% endif %}

    {{ log("results_list: " ~ results_list, True) }}
    {{ return(results_list) }}

{% endmacro %}

获取表

{% macro get_tables() %}
      {%- set tables = [
          ('table1', 'table2')
      ] -%}
  {{return(tables )}}
{% endmacro %}

模型

{%- for item in item_in_list_query() -%}
  {%- if not loop.first %} UNION ALL {% endif %}
  select {{ item.id }}
{%- endfor -%}

于 2020-08-19T07:21:41.767 回答