1

我正在使用XML来自databricks. 这是我的XML示例数据。

<ds Name="abc">
   <node begin="18" end="22" val="Organic" type="type1">
      <hs id="0" begin="18" end="91" />
   </node>
   <node begin="22" end="23" val="Cereal">
      <hs id="0" begin="18" end="91" />
   </node>
   <node begin="23" end="25" val="Kellogs" type="type2">
      <hs id="0" begin="18" end="91" />
   </node>
   <node begin="22" end="23" val="Harry" type="type1">
      <hs id="1" begin="108" end="520" />
   </node>
   <node begin="23" end="25" val="Potter" type="type1">
      <hs id="1" begin="108" end="520" />
   </node>
</ds>

我想将所有node.val按它们在XML文件中出现的顺序)组合在一起hs id

例如,上述数据的 o/p 应为:

名称 hs id Val

abc 0 有机谷物

abc 1 哈利波特

这是我从 databricks 加载 XML 源的地方:

val df = sqlContext.read
.format("com.databricks.spark.xml")
.option("rowTag", "ds")
.option("attributePrefix", "")
.load(args(0))

df.registerTempTable("ds")

我不确定如何按 对数据集进行分组hs id,并确保保留顺序。

val  df_ds = sqlContext.sql("SELECT Name, node.type from ds")
4

1 回答 1

1

尝试:

import scala.collection.mutable.LinkedHashMap
import org.apache.spark.sql.Row
import org.apache.spark.sql.functions.udf

val comb = udf((rows: Seq[Row]) => {
  val result = LinkedHashMap[Long, Array[String]]()
  for (row <- rows) {
     val id = row.getAs[Row]("hs").getAs[Long]("id")
     result(id) = result.getOrElse(id, Array[String]()) :+ row.getAs[String]("val")
  }
  result.values.toArray.map(x => x.mkString(" "))
})

df.printSchema
root
 |-- Name: string (nullable = true)
 |-- node: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- begin: long (nullable = true)
 |    |    |-- end: long (nullable = true)
 |    |    |-- hs: struct (nullable = true)
 |    |    |    |-- #VALUE: string (nullable = true)
 |    |    |    |-- begin: long (nullable = true)
 |    |    |    |-- end: long (nullable = true)
 |    |    |    |-- id: long (nullable = true)
 |    |    |-- type: string (nullable = true)
 |    |    |-- val: string (nullable = true)

df.withColumn("comb", comb(df("node")))
于 2016-08-08T15:20:29.057 回答