2

我正在为可能的用例评估 DBT,除了一种情况外,一切似乎都很好。这是源表具有结构字段的时候。

我正在使用Spark Thrift Server connector,基础数据存储parquetS3. DBT 版本为 0.20

这是源表创建语句的一部分,因为您可以看到那里有结构字段。

CREATE TABLE `<someSchema>`.`<sourceTable>` (
  `properties` STRUCT<`site`: STRING>,
  `channel` STRING,
  `timestamp` STRING,
  `anotherDate` STRING,
  `aDate ` STRING)
  USING parquet
  PARTITIONED BY (aDate)
  LOCATION 's3a://<someBucket>'

我的模型只是使用某些 where 子句对该表执行选择。第一次运行时,它工作得很好,它创建了一个与原始表完全相同的表,只是做了一些细微的更改,正如预期的那样,即使是结构字段也是如此。

这是一张sink table create table

CREATE TABLE `<someSchema>`.`dbtsink` (
      `properties` STRUCT<`site`: STRING>,
      `channel` STRING,
      `timestamp` STRING,
      `anotherDate ` STRING,
      `aDate` STRING)
USING parquet
PARTITIONED BY (anotherDate)

当我在 where 子句中使用其他一些值再次运行 dbt 时,我的问题出现了,它应该在接收器表中创建另一个分区。查询编译就好了

它引发了这个错误:

Runtime Error in model dbtsink (models/anotherDate/dbtsink.sql)
  Database Error
    Error running query: org.apache.spark.sql.AnalysisException: cannot resolve '`site`' given input columns: [dbtsink__dbt_tmp.channel, dbtsink__dbt_tmp.anotherDate, dbtsink__dbt_tmp.aDate, dbtsink__dbt_tmp.properties, dbtsink__dbt_tmp.timestamp]; line 4 pos 25;
    'InsertIntoStatement 'UnresolvedRelation [someSchema, dbtsink], false, false
    +- 'Project [properties#6526, 'site, channel#6527, timestamp#6528, aDate#6541, anotherDate#6540]
       +- SubqueryAlias dbtsink__dbt_tmp
          +- Project [properties#6526, channel#6527, timestamp#6528, anotherDate#6540, aDate#6541]
             +- Filter (((aDate#6541 > 2021060100) AND (aDate#6541 <= 2021070609)) AND (anotherDate#6540 = 2021070609))
                +- SubqueryAlias spark_catalog.someSchema.sourceTable
                   +- Relation[context#6524,traits#6525,properties#6526,channel#6527,timestamp#6528,projectId#6529,integrations#6530,messageId#6531,originalTimestamp#6532,receivedAt#6533,sentAt#6534,userId#6535,anonymousId#6536,type#6537,providerId#6538,version#6539,anotherDate#6540,aDate#6541] parquet

似乎它正在尝试将结构的内部字段作为根字段读取或写入。我用其他结构字段进行了测试,它发生了同样的情况,我只想要这样的结构,就像在第一次执行时一样。正如我所说,它只发生在第二次执行中。

这是我模型的查询,很简单

select 
properties,
channel,
timestamp,
anotherDate,
aDate
from {{ source('someSchema', 'sourceTable') }}
where aDate > '{{ var("aDateLowerLimit") }}' and aDate <= '{{ var("aDateUpperLimit") }}'
and anotherDate = '{{ var("anotherDate") }}'

如果我更改选择以将属性从 struct 转换为 json,to_json(properties)它会按预期工作,生成一个新分区。

如果 DBT 中的结构有问题?我做错了什么?

我正在使用增量实现,并使用 append 和 insert_overwrite 对其进行了测试,这似乎不是问题

4

1 回答 1

1

该问题与 DBT 如何尝试使用REGEX解析 Spark 表中的列有关。请参阅parse_columns_from_information函数。

您不能使用 REGEX 来解析表模式。该函数正在使用此 Spark SQL 语句提供的结果:show table extended in someSchema like '*'. 使用该语句时,您的表架构会得到类似的结果:

Schema: root
 |-- properties: struct (nullable = true)
 |    |-- site: string (nullable = true)
 |-- channel: string (nullable = true)
 |-- timestamp: string (nullable = true)
 |-- anotherDate: string (nullable = true)
 |-- aDate: string (nullable = true)

正如您所经历的那样,将该REGEX应用于上述字符串会弄乱您的列。

您可以使用parse_describe_extended解决此问题。此函数正在使用此 Spark SQL 语句提供的结果:describe extended someSchema.dbtsink. 为了使用 parse_describe_extended,您需要禁用 DBT 缓存(可能是有害的) 要禁用 DBT 缓存,您可以使用此 dbt 参数--bypass-cache

于 2021-08-10T16:07:41.177 回答