1

使用 spark,我试图从路径中读取一堆 xml,其中一个文件是一个不是 xml 的虚拟文件。

我希望火花告诉我一个特定的文件是无效的,无论如何

添加“badRecordsPath”otiton 将坏数据写入 JSON 文件的指定位置,但同样不适用于 xml,还有其他方法吗?

df = (spark.read.format('json')
      .option('badRecordsPath','/tmp/data/failed')
      .load('/tmp/data/dummy.json')
4

1 回答 1

1

据我所知....不幸的是,直到今天,它还没有以声明的方式在 spark 的 xml 包中可用...以您期望的方式...

Json 它一直在工作,因为FailureSafeParser它的实现如下所示......在DataFrameReader

/**
   * Loads a `Dataset[String]` storing JSON objects (<a href="http://jsonlines.org/">JSON Lines
   * text format or newline-delimited JSON</a>) and returns the result as a `DataFrame`.
   *
   * Unless the schema is specified using `schema` function, this function goes through the
   * input once to determine the input schema.
   *
   * @param jsonDataset input Dataset with one JSON object per record
   * @since 2.2.0
   */
  def json(jsonDataset: Dataset[String]): DataFrame = {
    val parsedOptions = new JSONOptions(
      extraOptions.toMap,
      sparkSession.sessionState.conf.sessionLocalTimeZone,
      sparkSession.sessionState.conf.columnNameOfCorruptRecord)

    val schema = userSpecifiedSchema.getOrElse {
      TextInputJsonDataSource.inferFromDataset(jsonDataset, parsedOptions)
    }

    ExprUtils.verifyColumnNameOfCorruptRecord(schema, parsedOptions.columnNameOfCorruptRecord)
    val actualSchema =
      StructType(schema.filterNot(_.name == parsedOptions.columnNameOfCorruptRecord))

    val createParser = CreateJacksonParser.string _
    val parsed = jsonDataset.rdd.mapPartitions { iter =>
      val rawParser = new JacksonParser(actualSchema, parsedOptions, allowArrayAsStructs = true)
      val parser = new FailureSafeParser[String](
        input => rawParser.parse(input, createParser, UTF8String.fromString),
        parsedOptions.parseMode,
        schema,
        parsedOptions.columnNameOfCorruptRecord)
      iter.flatMap(parser.parse)
    }
    sparkSession.internalCreateDataFrame(parsed, schema, isStreaming = jsonDataset.isStreaming)
  }

您可以实现功能编程方式。
使用 . 读取文件夹中的所有文件sc.textFile。foreach 文件使用 xml 解析器解析条目。

如果其有效重定向到另一个路径。

如果无效,则写入坏记录路径。

于 2019-06-13T20:49:46.077 回答