考虑一个有 6 列的数据框,我们希望按前 4 列分组,并在 col5 上进行旋转,同时在 col6 上进行聚合(比如求和)。因此,假设您不能使用 spark 1.6 版本,则可以将以下代码(在 spark 1.5 中)编写为:
val pivotedDf = df_to_pivot
.groupBy(col1,col2,col3,col4)
.pivot(col5)
.agg(sum(col6))
这是具有相同输出但没有使用内置数据透视函数的代码:
import scala.collection.SortedMap
//Extracting the col5 distinct values to create the new columns
val distinctCol5Values = df_to_pivot
.select(col(col5))
.distinct
.sort(col5) // ensure that the output columns are in a consistent logical order
.map(_.getString(0))
.toArray
.toSeq
//Grouping by the data frame to be pivoted on col1-col4
val pivotedAgg = df_to_pivot.rdd
.groupBy{row=>(row.getString(col1Index),
row.getDate(col2Index),
row.getDate(col3Index),
row.getString(col4Index))}
//Initializing a List of tuple of (String, double values) to be filled in the columns that will be created
val pivotColListTuple = distinctCol5Values.map(ft=> (ft,0.0))
// Using Sorted Map to ensure the order is maintained
var distinctCol5ValuesListMap = SortedMap(pivotColListTuple : _*)
//Pivoting the data on col5 by opening the grouped data
val pivotedRDD = pivotedAgg.map{groupedRow=>
distinctCol5ValuesListMap = distinctCol5ValuesListMap.map(ft=> (ft._1,0.0))
groupedRow._2.foreach{row=>
//Updating the distinctCol5ValuesListMap values to reflect the changes
//Change this part accordingly to what you want
distinctCol5ValuesListMap = distinctCol5ValuesListMap.updated(row.getString(col5Index),
distinctCol5ValuesListMap.getOrElse(row.getString(col5Index),0.0)+row.getDouble(col6Index))
}
Row.fromSeq(Seq(groupedRow._1._1,groupedRow._1._2,groupedRow._1._3,groupedRow._1._4) ++ distinctCol5ValuesListMap.values.toSeq)
}
//Consructing the structFields for new columns
val colTypesStruct = distinctCol5ValuesListMap.map(colName=>StructField(colName._1,DoubleType))
//Adding the first four column structFields with the new columns struct
val opStructType = StructType(Seq(StructField(col1Name,StringType),
StructField(col2Name,DateType),
StructField(col3Name,DateType),
StructField(col4Name,StringType)) ++ colTypesStruct )
//Creating the final data frame
val pivotedDF = sqlContext.createDataFrame(pivotedRDD,opStructType)