3

我有一个示例代码

import org.apache.spark.sql.Row
import scala.xml._

object reading_xml {
  def main(args: Array[String]): Unit = {
    //I have 42 Millions of records
    val records = List(
      "<root><c1>v1</c1><c2>v2</c2><c3>v3</c3><c4>v4</c4><c5>20181104</c5></root>",
      "<root><c1>v1</c1><c2>v2</c2><c3>v3</c3><c4>v4</c4><c5>20181102</c5></root>",
      "<root><c1>v1</c1><c2>v2</c2><c3>v3</c3><c4>v4</c4><c5>20181102</c5></root>",
      "<root><c1>v1</c1><c2>v2</c2><c3>v3</c3><c4>v4</c4><c5>20181106</c5><c6>v6</c6></root>"
    )
    import org.apache.spark.sql.SparkSession
    val spark = SparkSession.builder().master("local").getOrCreate()
    import spark.implicits._
    val df = records.toDF()
    df.show()
    val rdd = df.rdd.map(line => Row.fromSeq(
      "BNK"
    :: scala.xml.XML.loadString("<?xml version='1.0' encoding='utf-8'?>" + line(0)).child
      .filter(elem =>
        elem.label == "c1" 
        || elem.label == "c2" 
        || elem.label == "c3" 
        || (elem.label == "c5" && elem.text =="20181106")
      ).map(elem =>  elem.label+"@"+elem.text).toList)
    )
    rdd.take(100).foreach(println)

实际输出:

[BNK,c1@v1,c2@v2,c3@v3]
[BNK,c1@v1,c2@v2,c3@v3]
[BNK,c1@v1,c2@v2,c3@v3]
[BNK,c1@v1,c2@v2,c3@v3,c5@20181106]

我期望的是只得到一行结果。

[BNK,c1@v1,c2@v2,c3@v3,c5@20181106]

我的情况有什么问题,或者我错过了关于 scala_xml 的任何理解,以及如何获得预期的结果?

4

3 回答 3

0

您的外部地图需要 4 条记录,它按预期返回 4 条记录。您可能想在最后添加过滤器。

val rdd = df.rdd.map(line => Row.fromSeq(
      "BNK"
    :: scala.xml.XML.loadString("<?xml version='1.0' encoding='utf-8'?>" + line(0)).child
      .filter(elem =>
        elem.label == "c1" 
        || elem.label == "c2" 
        || elem.label == "c3" 
        || (elem.label == "c5" && elem.text =="20181106")
      ).map(elem =>  elem.label+"@"+elem.text).toList)
    ).filter(line => line.mkString.contains("c1") && line.mkString.contains("c2") &&
      line.mkString.contains("c3")&& line.mkString.contains("c5") && line.mkString.contains("20181106"))

rdd.take(100).foreach(println)

操作:

[BNK,c1@v1,c2@v2,c3@v3,c5@20181106] 
于 2019-03-30T08:38:37.560 回答
0

取决于你想做什么。如果您正在寻找标签 c1、c2、c3、c5 中的任何一个是否具有 20181106 的值,那么您可能想要这样做。

    (elem.label == "c1" || elem.label == "c2" || elem.label == "c3" || elem.label == "c5")
    && elem.text =="20181106"
于 2019-03-28T02:01:08.370 回答
0

可以解析 XML,然后留下所需的节点,然后留下所需值的节点:

val rdd = df.rdd.map(line => scala.xml.XML.loadString("<?xml version='1.0' encoding='utf-8'?>" + line(0)).child)
  // left only required nodes
  .map(nodeList => nodeList.filter(elem => Seq("c1", "c2", "c3", "c5").contains(elem.label)))
  // find element where "c5" == "20181106"
  .filter(nodeList => nodeList.find(elem => elem.label == "c5" && elem.text == "20181106").isDefined)
  .map(s => Row.fromSeq("BNK" :: s.map(elem => elem.label + "@" + elem.text).toList))
于 2019-03-31T20:17:56.987 回答