据我所知....不幸的是,直到今天,它还没有以声明的方式在 spark 的 xml 包中可用...以您期望的方式...
Json 它一直在工作,因为FailureSafeParser
它的实现如下所示......在DataFrameReader
/**
* Loads a `Dataset[String]` storing JSON objects (<a href="http://jsonlines.org/">JSON Lines
* text format or newline-delimited JSON</a>) and returns the result as a `DataFrame`.
*
* Unless the schema is specified using `schema` function, this function goes through the
* input once to determine the input schema.
*
* @param jsonDataset input Dataset with one JSON object per record
* @since 2.2.0
*/
def json(jsonDataset: Dataset[String]): DataFrame = {
val parsedOptions = new JSONOptions(
extraOptions.toMap,
sparkSession.sessionState.conf.sessionLocalTimeZone,
sparkSession.sessionState.conf.columnNameOfCorruptRecord)
val schema = userSpecifiedSchema.getOrElse {
TextInputJsonDataSource.inferFromDataset(jsonDataset, parsedOptions)
}
ExprUtils.verifyColumnNameOfCorruptRecord(schema, parsedOptions.columnNameOfCorruptRecord)
val actualSchema =
StructType(schema.filterNot(_.name == parsedOptions.columnNameOfCorruptRecord))
val createParser = CreateJacksonParser.string _
val parsed = jsonDataset.rdd.mapPartitions { iter =>
val rawParser = new JacksonParser(actualSchema, parsedOptions, allowArrayAsStructs = true)
val parser = new FailureSafeParser[String](
input => rawParser.parse(input, createParser, UTF8String.fromString),
parsedOptions.parseMode,
schema,
parsedOptions.columnNameOfCorruptRecord)
iter.flatMap(parser.parse)
}
sparkSession.internalCreateDataFrame(parsed, schema, isStreaming = jsonDataset.isStreaming)
}
您可以实现功能编程方式。
使用 . 读取文件夹中的所有文件sc.textFile
。foreach 文件使用 xml 解析器解析条目。
如果其有效重定向到另一个路径。
如果无效,则写入坏记录路径。