3

我正在尝试使用以下 Pyspark 代码从 Azure Data Lake 读取 Parquet 文件。

df= sqlContext.read.format("parquet")
   .option("header", "true")
   .option("inferSchema", "true")
   .load("adl://xyz/abc.parquet")
df = df['Id','IsDeleted']

现在我想使用以下代码将此数据帧 df 作为表加载到 sql 数据仓库中:

df.write \
  .format("com.databricks.spark.sqldw") \
  .mode('overwrite') \
  .option("url", sqlDwUrlSmall) \
  .option("forward_spark_azure_storage_credentials", "true") \
  .option("dbtable", "test111") \
  .option("tempdir", tempDir) \
  .save()

这将在 SQL 数据仓库中创建一个表 dbo.test111,其数据类型为:

  • 标识(nvarchar(256),空)
  • IsDeleted(位,空)

但我需要这些具有不同数据类型的列,例如 SQL 数据仓库中的 char(255)、varchar(128)。在将数据框加载到 SQL Dataware house 时如何执行此操作?

4

2 回答 2

0

给出了 Spark SQL 上唯一支持的数据类型 [ https://spark.apache.org/docs/1.6.0/api/java/org/apache/spark/sql/types/package-summary.html][1]

字符串类型实际上会变成VARCHAR未指定长度的类型。Spark SQL 没有VARCHAR(n)数据类型。

您应该能够执行以下操作

import org.apache.spark.sql.types._

val df = 
          df.withColumn("Id_mod", df.Id.cast(StringType)) 
            .withColumn("IsDeleted_mod", df.IsDeleted.cast(StringType))
            .drop("Id")
            .drop("IsDeleted")
            .withColumnRenamed("Id_mod", "Id")
            .withColumnRenamed("IsDeleted_mod", "IsDeleted")
            //Replace StringType with Any supported desired type
于 2019-01-16T03:40:34.690 回答
0

我找到了一种方法可以帮助您修改列数据类型,但可能无法实现您想要的。

df.select(col("colname").cast(DataType))

这是关于如何更改 Spark SQL 的 DataFrame 中的列类型的 blob

也许这可以帮助你。

于 2019-01-16T03:09:10.433 回答