1

我有一个类似于 Qubole 的表格:

use dm;

CREATE EXTERNAL TABLE IF NOT EXISTS fact (
    id string,
    fact_attr struct<
        attr1 : String,
        attr2 : String
    >
)
STORED AS PARQUET
LOCATION 's3://my-bucket/DM/fact'

我在 Snowflake 中创建了并行表,如下所示:

CREATE TABLE IF NOT EXISTS dm.fact (
    id string,
    fact_attr variant
)

我的 ETL 过程将数据加载到 qubole 表中,例如:

+------------+--------------------------------+
| id         | fact_attr                      |
+------------+--------------------------------+
| 1          | {"attr1": "a1", "attr2": "a2"} |
| 2          | {"attr1": "a3", "attr2": null} |
+------------+--------------------------------+

我正在尝试使用 Merge 命令将此数据同步到雪花,例如

MERGE INTO DM.FACT dst USING %s src 
    ON dst.id = src.id
WHEN MATCHED THEN UPDATE SET
    fact_attr = parse_json(src.fact_attr)
WHEN NOT MATCHED THEN INSERT (
    id,
    fact_attr
) VALUES (
    src.id,
    parse_json(src.fact_attr)
);

我正在使用 PySpark 同步数据:

df.write \
  .option("sfWarehouse", sf_warehouse) \
  .option("sfDatabase", sf_database) \
  .option("sfSchema", sf_schema) \
  .option("postactions", query) \
  .mode("overwrite") \
  .snowflake("snowflake", sf_warehouse, sf_temp_table)

使用上面的命令,我收到以下错误:

pyspark.sql.utils.IllegalArgumentException: u"Don't know how to save StructField(fact_attr,StructType(StructField(attr1,StringType,true), StructField(attr2,StringType,true)),true) of type attributes to Snowflake"

我已阅读以下链接但没有成功:

问题:

如何将具有 STRUCT 字段的 Qubole Hive 表中的数据插入/同步到雪花?

4

1 回答 1

0

尝试此操作时使用的 Spark Connector for Snowflake 版本缺乏对变体数据类型的支持。

在其连接器版本 2.4.4(2018 年 7 月发布)中引入了支持,其中 StructType 字段现在自动映射到可与您的 MERGE 命令一起使用的 VARIANT 数据类型。

于 2020-01-16T05:14:15.680 回答