-1
val rdd = df.rdd.map(line => Row.fromSeq((
        scala.xml.XML.loadString("<?xml version='1.0' encoding='utf-8'?>" + line(1)).child
        .filter(elem =>
               elem.label == "name1" 
            || elem.label == "name2" 
            || elem.label == "name3"  
            || elem.label == "name4" 

        ).map(elem => (elem.label -> elem.text)).toList)
    )

我这样做 了rdd.take(10).foreach(println)RDD[Row]然后我的输出如下所示:

[(name1, value1), (name2, value2),(name3, value3)]
[(name1, value11), (name2, value22),(name3, value33)]
[(name1, value111), (name2, value222),(name4, value44)]

我想将它保存到 csv 中(name1..name4 是 csv 的标题),请任何人帮助我如何用 apache spark 实现它2.4.0

name1    | name2     | name3    | name4
value1   | value2    |value3    | null
value11  | value22   |value33   | null
value111 | value222  |null      | value444
4

1 回答 1

2

我调整了您的示例并添加了一些中间值以帮助完成每个步骤:

  // define the labels you want:
  val labels = Seq("name1", "name2", "name3", "name4")
  val result: RDD[Row] = rdd.map { line =>
    // your raw data
    val tuples: immutable.Seq[(String, String)] = 
      scala.xml.XML.loadString("<?xml version='1.0' encoding='utf-8'?>" + line(1)).child
      .filter(elem => labels.contains(elem.label)) // you can use the label list to filter
      .map(elem => (elem.label -> elem.text)).toList // no change here
    val values: Seq[String] = 
    labels.map(l =>
      // take the values you have a label 
      tuples.find{case (k, v) => k == l}.map(_._2)
      // or just add an empty String
        .getOrElse(""))
    // create a Row
    Row.fromSeq(values)
  }

现在我不确定 - 但本质上你必须将标题 Row 作为第一行插入:

[name1, name2, name3]
于 2019-03-24T09:16:24.203 回答