0

我正在使用 spark DataFrames 并尝试对相同模式的 DataFrames 进行重复数据删除。

将 DataFrame 保存到 JSON 之前的架构如下:

root
 |-- startTime: long (nullable = false)
 |-- name: string (nullable = true)

从 JSON 文件加载后的 DataFrame 架构如下:

root
 |-- name: string (nullable = true)
 |-- startTime: long (nullable = false)

我将 JSON 保存为:

newDF.write.json(filePath)

并回读为:

existingDF = sqlContext.read.json(filePath)

做 unionAll 之后

existingDF.unionAll(newDF).distinct()

或除了

newDF.except(existingDF)

由于架构更改,重复数据删除失败。

我可以避免这种模式转换吗?有没有办法在保存到 JSON 文件和从 JSON 文件加载回来时保存(或强制执行)模式序列?

4

1 回答 1

0

实施了一种解决方法,将架构转换回我需要的架构:

val newSchema = StructType(jsonDF.schema.map {
  case StructField(name, dataType, nullable, metadata) if name.equals("startTime") => StructField(name, LongType, nullable = false, metadata)
  case y: StructField => y
})
existingDF = sqlContext.createDataFrame(jsonDF.rdd, newSchema).select("startTime", "name")
于 2016-01-16T13:47:23.720 回答