我在加载许多 XML 文件后创建数据。每个 xml 文件都有一个唯一字段fun:DataPartitionId
,我从一个 XML 文件创建了许多行。
现在我想为fun:DataPartitionId
XML 生成的行中的每一行添加它。
例如,假设第一个 XML 有 100 行,那么每 100 行将具有相同fun:DataPartitionId
的字段。
fun:DataPartitionId
作为每个 XML 中的标头文件也是如此。
这就是我正在做的。
val sqlContext = new org.apache.spark.sql.SQLContext(sc)
import sqlContext.implicits._
import org.apache.spark.{ SparkConf, SparkContext }
import java.sql.{Date, Timestamp}
import org.apache.spark.sql.Row
import org.apache.spark.sql.types._
import org.apache.spark.sql.functions.udf
val getDataPartition = udf { (DataPartition: String) =>
if (DataPartition=="1") "SelfSourcedPublic"
else if (DataPartition=="2") "Japan"
else if (DataPartition=="3") "SelfSourcedPrivate"
else "ThirdPartyPrivate"
}
val getFFActionParent = udf { (FFAction: String) =>
if (FFAction=="Insert") "I|!|"
else if (FFAction=="Overwrite") "I|!|"
else "D|!|"
}
val getFFActionChild = udf { (FFAction: String) =>
if (FFAction=="Insert") "I|!|"
else if (FFAction=="Overwrite") "O|!|"
else "D|!|"
}
val dfContentEnvelope = sqlContext.read.format("com.databricks.spark.xml").option("rowTag", "env:ContentEnvelope").load("s3://trfsmallfffile/XML")
val dfDataPartition=getDataPartition(dfContentEnvelope("env:Header.fun:DataPartitionId"))
val dfContentItem = dfContentEnvelope.withColumn("column1", explode(dfContentEnvelope("env:Body.env:ContentItem"))).select("column1.*")
val df =dfContentItem.withColumn("DataPartition",dfDataPartition)
df.show()