我有一个类似于 Qubole 的表格:
use dm;
CREATE EXTERNAL TABLE IF NOT EXISTS fact (
id string,
fact_attr struct<
attr1 : String,
attr2 : String
>
)
STORED AS PARQUET
LOCATION 's3://my-bucket/DM/fact'
我在 Snowflake 中创建了并行表,如下所示:
CREATE TABLE IF NOT EXISTS dm.fact (
id string,
fact_attr variant
)
我的 ETL 过程将数据加载到 qubole 表中,例如:
+------------+--------------------------------+
| id | fact_attr |
+------------+--------------------------------+
| 1 | {"attr1": "a1", "attr2": "a2"} |
| 2 | {"attr1": "a3", "attr2": null} |
+------------+--------------------------------+
我正在尝试使用 Merge 命令将此数据同步到雪花,例如
MERGE INTO DM.FACT dst USING %s src
ON dst.id = src.id
WHEN MATCHED THEN UPDATE SET
fact_attr = parse_json(src.fact_attr)
WHEN NOT MATCHED THEN INSERT (
id,
fact_attr
) VALUES (
src.id,
parse_json(src.fact_attr)
);
我正在使用 PySpark 同步数据:
df.write \
.option("sfWarehouse", sf_warehouse) \
.option("sfDatabase", sf_database) \
.option("sfSchema", sf_schema) \
.option("postactions", query) \
.mode("overwrite") \
.snowflake("snowflake", sf_warehouse, sf_temp_table)
使用上面的命令,我收到以下错误:
pyspark.sql.utils.IllegalArgumentException: u"Don't know how to save StructField(fact_attr,StructType(StructField(attr1,StringType,true), StructField(attr2,StringType,true)),true) of type attributes to Snowflake"
我已阅读以下链接但没有成功:
问题:
如何将具有 STRUCT 字段的 Qubole Hive 表中的数据插入/同步到雪花?