我正在尝试使用我编写的函数将 pyspark df 写入 Snowflake:
def s3_to_snowflake(schema, table):
df = get_dataframe(schema, table, sqlContext)
username = user
password = passw
account = acct
snowflake_options = {
"sfURL" : account+".us-east-1.snowflakecomputing.com",
"sfAccount" : account,
"sfUser" : username,
"sfPassword" : password,
"sfDatabase" : "database",
"sfSchema" : schema,
"sfWarehouse" : "demo_wh"
}
sc._jsc.hadoopConfiguration().set("fs.s3.awsAccessKeyId", "KeyId")
sc._jsc.hadoopConfiguration().set("fs.s3.awsSecretAccessKey",
"AccessKey")
(
df
.write
.format("net.snowflake.spark.snowflake")
.mode("overwrite")
.options(**snowflake_options)
.option("dbtable", table)
.option('tempDir', 's3://data-temp-loads/snowflake')
.save()
)
print('Wrote {0} to {1}.'.format(table, schema))
除了我在数据湖中的一个表之外,此功能适用于所有表。这是我要编写的表的架构。
root
|-- credit_transaction_id: string (nullable = true)
|-- credit_deduction_amt: double (nullable = true)
|-- credit_adjustment_time: timestamp (nullable = true)
我得到的错误看起来像 Snowflake 与该 DoubleType 列有关。在使用 Avro/ORC 文件类型时,我在使用 Hive 之前遇到过这个问题。通常是将一种数据类型转换为另一种数据类型的问题。
我尝试过的事情:
- 铸造(双浮点数,双精度字符串,双精度数字——根据雪花文档的最后一个)
- 重新运行传入表的 DDL,尝试 Float、String 和 Numeric 类型
另一件需要注意的事情:我已成功传输的一些表具有 DoubleType 列。不确定这张表的问题是什么。