1

我正在将一个集合从 MongodB 导入到 Spark。所有文档都具有字段“数据”,该字段又是一个结构,并具有字段“配置名称”(始终为空)。

val partitionDF = spark.read.format("com.mongodb.spark.sql.DefaultSource").option("database", "db").option("collection", collectionName).load()

对于结果中的dataDataFrame,我得到这种类型:

StructType(StructField(configurationName,NullType,true), ...

当我尝试将数据框保存为 Parquet

partitionDF.write.mode("overwrite").parquet(collectionName + ".parquet")

我收到以下错误:

AnalysisException: Parquet 数据源不支持 struct<configurationName:null, ...

看起来问题是我把它NullType埋在了data列的类型中。我正在研究如何从 Spark 写入 parquet 时处理空值,但它只显示了如何NullType在顶级列上解决此问题。

NullType但是当 a不在顶层时,如何解决这个问题呢?到目前为止,我唯一的想法是完全展平数据框(爆炸数组等),然后所有的NullTypes 都会在顶部弹出。但在这种情况下,我会丢失数据的原始结构(我不想丢失)。

有更好的解决方案吗?

4

2 回答 2

1

基于如何在从 Spark 写入镶木地板时处理空值以及如何传递模式以从现有数据帧创建新数据帧?(第二个是@pasha701 建议的,谢谢!),我构建了这个:

def denullifyStruct(struct: StructType): StructType = {
  val items = struct.map{ field => StructField(field.name, denullify(field.dataType), field.nullable, field.metadata) }
  StructType(items)
}

def denullify(dt: DataType): DataType = {
  dt match {
    case struct: StructType => denullifyStruct(struct)
    case array: ArrayType => ArrayType(denullify(array.elementType), array.containsNull)
    case _: NullType => StringType
    case _ => dt
  }
}

它有效地用一个替换所有NullType实例StringType

接着

val fixedDF = spark.createDataFrame(partitionDF.rdd, denullifyStruct(partitionDF.schema))
fixedDF.printSchema
于 2021-08-12T10:31:50.113 回答
1

@Roman Puchkovskiy:使用模式匹配重写你的函数。

  def deNullifyStruct(struct: StructType): StructType = {
    val items = struct.map { field => StructField(field.name, fixNullType(field.dataType), field.nullable, field.metadata) }
    StructType(items)
  }
  def fixNullType(dt: DataType): DataType = {
    dt match {
      case _: StructType => return deNullifyStruct(dt.asInstanceOf[StructType])
      case _: ArrayType =>
        val array = dt.asInstanceOf[ArrayType]
        return ArrayType(fixNullType(array.elementType), array.containsNull)
      case _: NullType => return StringType
      case _ => return dt
    }
  }
于 2021-08-12T13:40:42.910 回答