3

我有以下问题:

  • 我有一个 AWS S3 管道,每天都会吐出一个 json.gz 文件。
  • 我希望用 dbt 将该文件放入雪花中(不使用雪管 atm)

我已经设法通过创建存储集成来做到这一点,并且我已经使用我的角色(用于运行 dbt)手动创建了一个模式并评估该模式的使用情况。到现在为止还挺好。

然后我读到了这个:

https://github.com/fishtown-analytics/dbt-external-tables

问题是这是正确运行的唯一方法,我必须更改我的 dbt profiles.yml,将默认模式设置为 S3_MIXPANEL 和默认数据库 RAW_DEV,使用 --target 'ingest_dev' 参数运行不同的目标和角色.

我一直认为应该有一个更复杂的解决方案,我可以在其中创建模式和查询元数据并使用类似 {{ source() }} 的东西,这样我就可以以某种方式指出我的文档这是一个外部来源。我认为这个 dbt-external-tables 并没有很好地解释我的情况?

请任何人都可以帮助我并分享如何正确地从外部阶段创建模式和查询,而无需每次都更改默认模式宏和 dbtprofiles.yml?

我已成功运行以下代码:

{{
  config(
    materialized ='incremental',
    schema = generate_schema_name('S3_MIXPANEL')
  )
}}
 
  SELECT
    metadata$filename as file_name,
    to_date(SUBSTR(metadata$filename,16,10),'yyyy/mm/dd') as event_date,
    $1 as payload,
    CONVERT_TIMEZONE('Europe/London',TO_TIMESTAMP_tz($1:properties:mp_processing_time_ms::int / 1000)) as  event_timestamp_converted,
    CONVERT_TIMEZONE('Europe/London', current_timestamp) as ingested_at

 from

    @my_s3_stage

    
{% if is_incremental() %}
    -- this filter will only be applied on an incremental run
    WHERE event_date>(
    SELECT
        max(event_date)
    FROM
        {{ this }}
    )
{% endif %}

{{ row_limit() }} 

编辑 22-06-20:

我已经在我的模型中添加了 src_mixpanel.yml 文件并运行了 dbt 命令,但是我还必须指定 data_types,所以我也添加了它们,然后我显然也必须在我的宏中添加“宏”(顺便说一句,也许是愚蠢的问题,但我真的不知道如何安装你的包,所以我手动将你的所有宏添加到我的包中)。

现在当我运行这段代码时:

dbt run-operation stage_external_sources

version: 2

sources:

  - name: s3_mixpanel
    database: RAW_DEV
    tables:
      - name: events
        external:
          location: '@my_s3_stage'
          auto_refresh: false # depends on your S3 setup
          partitions:
            - name: event_date
              expression: to_date(SUBSTR(metadata$filename,16,10),'yyyy/mm/dd')
              data_type: date
            - name: file_name
              expression: metadata$filename
              data_type: string
          columns:
            - name: properties
              data_type: variant

我收到一个错误:

运行操作时遇到错误:宏 stage_external_sources (macros/stage_external_sources.sql)
'dict object' has no attribute 'sources'中的编译错误

4

1 回答 1

7

作为dbt-external-tables包的维护者,我将分享它的观点。该软件包认为您应该将所有外部源(S3 文件)作为外部表或首先使用雪管,在一个包含尽可能少的混淆逻辑的过程中。然后,您可以从它们中选择,作为 dbt 模型中的源,以及所有必需的业务逻辑。

如果我的理解是正确的,您将在一个名为(例如)models/staging/mixpanel/src_mixpanel.yml的文件中暂存您的 mixpanel 数据,如下所示:

version: 2

sources:

  - name: s3_mixpanel
    database: raw_dev
    tables:
      - name: events
        external:
          location: '@my_s3_stage'
          file_format: "( type = json )"  # or a named file format
          auto_refresh: false # depends on your S3 setup
          partitions:
            - name: event_date
              expression: to_date(SUBSTR(metadata$filename,16,10),'yyyy/mm/dd')
          columns:
            - name: properties
              data_type: variant

你可以从包中运行这个宏来创建外部表——如果你没有auto_refresh启用,在创建之后更新它的分区元数据(参见 Snowflake文档):

dbt run-operation stage_external_sources

然后,您可以在增量模型中从此源中进行选择,就像您上面的模型一样。现在,event_date是这个外部表上的一个分区列,因此对其进行过滤应该使 Snowflake 能够修剪文件(尽管这在历史上对于动态的、子查询派生的过滤器来说是不一致的)。

{{
  config(
    materialized ='incremental'
  )
}}
 
  SELECT
    metadata$filename as file_name,
    event_date,
    value as payload,
    properties:mp_processing_time_ms::int / 1000 as event_timestamp_converted,
    CONVERT_TIMEZONE('Europe/London', current_timestamp) as modeled_at

 from {{ source('s3_mixpanel', 'events' }} 

    
{% if is_incremental() %}
    -- this filter will only be applied on an incremental run
    WHERE event_date >(
    SELECT
        max(event_date)
    FROM
        {{ this }}
    )
{% endif %}

{{ row_limit() }}
于 2020-07-20T13:06:59.890 回答