0

我在加载许多 XML 文件后创建数据。每个 xml 文件都有一个唯一字段fun:DataPartitionId ,我从一个 XML 文件创建了许多行。

现在我想为fun:DataPartitionIdXML 生成的行中的每一行添加它。

例如,假设第一个 XML 有 100 行,那么每 100 行将具有相同fun:DataPartitionId的字段。

fun:DataPartitionId作为每个 XML 中的标头文件也是如此。

这就是我正在做的。

val sqlContext = new org.apache.spark.sql.SQLContext(sc)

    import sqlContext.implicits._
    import org.apache.spark.{ SparkConf, SparkContext }
    import java.sql.{Date, Timestamp}
    import org.apache.spark.sql.Row
    import org.apache.spark.sql.types._
    import org.apache.spark.sql.functions.udf


val getDataPartition =  udf { (DataPartition: String) => 
    if (DataPartition=="1") "SelfSourcedPublic"
    else  if (DataPartition=="2") "Japan"
    else  if (DataPartition=="3") "SelfSourcedPrivate"
    else "ThirdPartyPrivate"
}

val getFFActionParent =  udf { (FFAction: String) => 
    if (FFAction=="Insert") "I|!|"
    else if (FFAction=="Overwrite") "I|!|"
    else "D|!|" 
}

val getFFActionChild =  udf { (FFAction: String) => 
    if (FFAction=="Insert") "I|!|"
    else if (FFAction=="Overwrite") "O|!|"
    else "D|!|" 
}

val dfContentEnvelope = sqlContext.read.format("com.databricks.spark.xml").option("rowTag", "env:ContentEnvelope").load("s3://trfsmallfffile/XML")
val dfDataPartition=getDataPartition(dfContentEnvelope("env:Header.fun:DataPartitionId"))


val dfContentItem = dfContentEnvelope.withColumn("column1", explode(dfContentEnvelope("env:Body.env:ContentItem"))).select("column1.*")
val df =dfContentItem.withColumn("DataPartition",dfDataPartition)
df.show()
4

1 回答 1

2

当您使用阅读xml文件时

val dfContentEnvelope = sqlContext.read.format("com.databricks.spark.xml").option("rowTag", "env:ContentEnvelope").load("s3://trfsmallfffile/XML")

DataParitionId列被读作Long

fun:DataPartitionId: long (nullable = true)

所以你应该将udf功能更改为

val getDataPartition =  udf { (DataPartition: Long) =>
  if (DataPartition== 1) "SelfSourcedPublic"
  else  if (DataPartition== 2) "Japan"
  else  if (DataPartition== 3) "SelfSourcedPrivate"
  else "ThirdPartyPrivate"
}

如果可能,您应该使用 when 函数而不是 udf 函数来提高处理速度和内存使用率

现在我想为来自 xml 的结果行中的每一行添加这个 fun:DataPartitionId 。

你的错误是你忘记了select那个特定的列,所以下面的代码

val dfContentItem = dfContentEnvelope.withColumn("column1", explode(dfContentEnvelope("env:Body.env:ContentItem"))).select("column1.*")

应该

val dfContentItem = dfContentEnvelope.withColumn("column1", explode(dfContentEnvelope("env:Body.env:ContentItem"))).select($"env:Header.fun:DataPartitionId".as("DataPartitionId"),$"column1.*")

然后您可以应用该udf功能

val df = dfContentItem.select(getDataPartition($"DataPartitionId"), $"env:Data.sr:Source.*", $"_action".as("FFAction|!|"))

所以作为一个整体的工作代码应该是

val sqlContext = new org.apache.spark.sql.SQLContext(sc)

    import sqlContext.implicits._
    import org.apache.spark.{ SparkConf, SparkContext }
    import java.sql.{Date, Timestamp}
    import org.apache.spark.sql.Row
    import org.apache.spark.sql.types._
    import org.apache.spark.sql.functions.udf


val getDataPartition =  udf { (DataPartition: Long) => 
    if (DataPartition=="1") "SelfSourcedPublic"
    else  if (DataPartition=="2") "Japan"
    else  if (DataPartition=="3") "SelfSourcedPrivate"
    else "ThirdPartyPrivate"
}

val dfContentEnvelope = sqlContext.read.format("com.databricks.spark.xml").option("rowTag", "env:ContentEnvelope").load("s3://trfsmallfffile/XML")

val dfContentItem = dfContentEnvelope.withColumn("column1", explode(dfContentEnvelope("env:Body.env:ContentItem"))).select($"env:Header.fun:DataPartitionId".as("DataPartitionId"),$"column1.*")
val df = dfContentItem.select(getDataPartition($"DataPartitionId"), $"env:Data.sr:Source.*", $"_action".as("FFAction|!|"))
df.show(false)

您可以继续执行其余代码。

于 2018-02-10T05:01:01.133 回答