0

如何将以下值从多个 XML 文件转换为 spark 数据框:

  • 属性Id0来自Level_0
  • Date/ValueLevel_4

所需输出:

+----------------+-------------+---------+
|Id0             |Date         |Value    |
+----------------+-------------+---------+
|Id0_value_file_1|  2021-01-01 |   4_1   |
|Id0_value_file_1|  2021-01-02 |   4_2   |
|Id0_value_file_2|  2021-01-01 |   4_1   |
|Id0_value_file_2|  2021-01-02 |   4_2   |
+----------------+-------+---------------+

文件_1.xml:

<Level_0 Id0="Id0_value_file1">
  <Level_1 Id1_1 ="Id3_value" Id_2="Id2_value">
    <Level_2_A>A</Level_2_A>
    <Level_2>
      <Level_3>
        <Level_4>
          <Date>2021-01-01</Date>
          <Value>4_1</Value>
        </Level_4>
        <Level_4>
          <Date>2021-01-02</Date>
          <Value>4_2</Value>
        </Level_4>
      </Level_3>
    </Level_2>
  </Level_1>
</Level_0>

文件_2.xml:

<Level_0 Id0="Id0_value_file2">
  <Level_1 Id1_1 ="Id3_value" Id_2="Id2_value">
    <Level_2_A>A</Level_2_A>
    <Level_2>
      <Level_3>
        <Level_4>
          <Date>2021-01-01</Date>
          <Value>4_1</Value>
        </Level_4>
        <Level_4>
          <Date>2021-01-02</Date>
          <Value>4_2</Value>
        </Level_4>
      </Level_3>
    </Level_2>
  </Level_1>
</Level_0>

当前代码示例:

files_list = ["file_1.xml", "file_2.xml"]
df = (spark.read.format('xml')
           .options(rowTag="Level_4")
           .load(','.join(files_list))

当前输出:(Id0缺少属性的列)

+-------------+---------+
|Date         |Value    |
+-------------+---------+
|  2021-01-01 |     4_1 |
|  2021-01-02 |     4_2 |
|  2021-01-01 |     4_1 |
|  2021-01-02 |     4_2 |
+-------+---------------+

有一些示例,但没有一个可以解决问题:-我正在使用 databricks spark_xml - https://github.com/databricks/spark-xml -有一个示例,但没有属性读取,在 spark 中读取 XML使用 sparkxml 从 xml 中提取标签属性

编辑: 正如@mck 正确指出的那样, <Level_2>A</Level_2>XML 格式不正确。我的示例中有一个错误(现在更正了 xml 文件),应该是<Level_2_A>A</Level_2_A>. 之后,建议的解决方案甚至适用于多个文件。

注意:为了加速加载大量 xml 定义架构,如果没有定义架构,则在创建数据帧以干扰架构时,火花正在读取每个文件...更多信息:https ://szczeles.github.io/Reading-JSON- CSV-and-XML-files-efficiently-in-Apache-Spark/

步骤1):

 files_list = ["file_1.xml", "file_2.xml"]
 # for schema seem NOTE above

 df = (spark.read.format('xml')
               .options(rowTag="Level_0")
               .load(','.join(files_list),schema=schema))
df.printSchema()

root
 |-- Level_1: struct (nullable = true)
 |    |-- Level_2: struct (nullable = true)
 |    |    |-- Level_3: struct (nullable = true)
 |    |    |    |-- Level_4: array (nullable = true)
 |    |    |    |    |-- element: struct (containsNull = true)
 |    |    |    |    |    |-- Date: string (nullable = true)
 |    |    |    |    |    |-- Value: string (nullable = true)
 |    |-- Level_2_A: string (nullable = true)
 |    |-- _Id1_1: string (nullable = true)
 |    |-- _Id_2: string (nullable = true)
 |-- _Id0: string (nullable = true

第 2 步)见下面@mck 解决方案:

4

1 回答 1

1

您可以Level_0用作 rowTag,并分解相关的数组/结构:

import pyspark.sql.functions as F

df = spark.read.format('xml').options(rowTag="Level_0").load('line_removed.xml')

df2 = df.select(
    '_Id0', 
    F.explode_outer('Level_1.Level_2.Level_3.Level_4').alias('Level_4')
).select(
    '_Id0',
    'Level_4.*'
)

df2.show()
+---------------+----------+-----+
|           _Id0|      Date|Value|
+---------------+----------+-----+
|Id0_value_file1|2021-01-01|  4_1|
|Id0_value_file1|2021-01-02|  4_2|
+---------------+----------+-----+
于 2021-01-01T08:39:04.307 回答