1

我正在尝试使用我编写的函数将 pyspark df 写入 Snowflake:

def s3_to_snowflake(schema, table):

    df = get_dataframe(schema, table, sqlContext)

    username = user
    password = passw
    account = acct

    snowflake_options = {
        "sfURL" : account+".us-east-1.snowflakecomputing.com",
        "sfAccount" : account,
        "sfUser" : username,
        "sfPassword" : password,
        "sfDatabase" : "database",
        "sfSchema" : schema,
        "sfWarehouse" : "demo_wh"
    }

    sc._jsc.hadoopConfiguration().set("fs.s3.awsAccessKeyId", "KeyId")
    sc._jsc.hadoopConfiguration().set("fs.s3.awsSecretAccessKey", 
"AccessKey")

    (
      df
      .write
      .format("net.snowflake.spark.snowflake")
      .mode("overwrite")
      .options(**snowflake_options)
      .option("dbtable", table)
      .option('tempDir', 's3://data-temp-loads/snowflake')
      .save()
    )

    print('Wrote {0} to {1}.'.format(table, schema))

除了我在数据湖中的一个表之外,此功能适用于所有表。这是我要编写的表的架构。

root
|-- credit_transaction_id: string (nullable = true)
|-- credit_deduction_amt: double (nullable = true)
|-- credit_adjustment_time: timestamp (nullable = true)

我得到的错误看起来像 Snowflake 与该 DoubleType 列有关。在使用 Avro/ORC 文件类型时,我在使用 Hive 之前遇到过这个问题。通常是将一种数据类型转换为另一种数据类型的问题。

我尝试过的事情:

另一件需要注意的事情:我已成功传输的一些表具有 DoubleType 列。不确定这张表的问题是什么。

4

1 回答 1

1

在网上闲逛之后,在我看来,这个错误是由 Spark 的 Parquet 阅读器引发的:

https://github.com/apache/spark/blob/branch-2.0/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedColumnReader.java

您的文件是否定义了dfParquet?我认为这可能是读取错误而不是写入错误;可能值得看看发生了什么get_dataframe

谢谢, etduwx

于 2018-06-10T18:42:04.250 回答