我有一个示例代码
import org.apache.spark.sql.Row
import scala.xml._
object reading_xml {
def main(args: Array[String]): Unit = {
//I have 42 Millions of records
val records = List(
"<root><c1>v1</c1><c2>v2</c2><c3>v3</c3><c4>v4</c4><c5>20181104</c5></root>",
"<root><c1>v1</c1><c2>v2</c2><c3>v3</c3><c4>v4</c4><c5>20181102</c5></root>",
"<root><c1>v1</c1><c2>v2</c2><c3>v3</c3><c4>v4</c4><c5>20181102</c5></root>",
"<root><c1>v1</c1><c2>v2</c2><c3>v3</c3><c4>v4</c4><c5>20181106</c5><c6>v6</c6></root>"
)
import org.apache.spark.sql.SparkSession
val spark = SparkSession.builder().master("local").getOrCreate()
import spark.implicits._
val df = records.toDF()
df.show()
val rdd = df.rdd.map(line => Row.fromSeq(
"BNK"
:: scala.xml.XML.loadString("<?xml version='1.0' encoding='utf-8'?>" + line(0)).child
.filter(elem =>
elem.label == "c1"
|| elem.label == "c2"
|| elem.label == "c3"
|| (elem.label == "c5" && elem.text =="20181106")
).map(elem => elem.label+"@"+elem.text).toList)
)
rdd.take(100).foreach(println)
实际输出:
[BNK,c1@v1,c2@v2,c3@v3]
[BNK,c1@v1,c2@v2,c3@v3]
[BNK,c1@v1,c2@v2,c3@v3]
[BNK,c1@v1,c2@v2,c3@v3,c5@20181106]
我期望的是只得到一行结果。
[BNK,c1@v1,c2@v2,c3@v3,c5@20181106]
我的情况有什么问题,或者我错过了关于 scala_xml 的任何理解,以及如何获得预期的结果?