3

我想使用没有 Pivot 函数的 spark scala 转置下表

我使用的是 Spark 1.5.1,而 Pivot 功能在 1.5.1 中不支持。请建议合适的方法来转置下表:

Customer Day   Sales
1        Mon    12
1        Tue    10
1        Thu    15
1        Fri     2
2        Sun    10
2        Wed     5
2        Thu     4
2        Fri     3

输出表:

Customer Sun Mon Tue Wed Thu Fri
   1      0   12  10   0  15  2
   2     10    0   0   5  4   3

以下代码不起作用,因为我使用的是 Spark 1.5.1,Spark 1.6 提供了数据透视函数:

    var Trans = Cust_Sales.groupBy("Customer").Pivot("Day").sum("Sales")
4

3 回答 3

4

不确定这有多有效,但您可以使用collect来获取所有不同的日期,然后添加这些列,然后使用groupByand sum

// get distinct days from data (this assumes there are not too many of them):
val days: Array[String] = df.select("Day")
    .distinct()
    .collect()
    .map(_.getAs[String]("Day"))

// add column for each day with the Sale value if days match:
val withDayColumns = days.foldLeft(df) { 
    case (data, day) => data.selectExpr("*", s"IF(Day = '$day', Sales, 0) AS $day")
}

// wrap it up 
val result = withDayColumns
   .drop("Day")
   .drop("Sales")
   .groupBy("Customer")
   .sum(days: _*)

result.show()

哪个打印(几乎)你想要的:

+--------+--------+--------+--------+--------+--------+--------+
|Customer|sum(Tue)|sum(Thu)|sum(Sun)|sum(Fri)|sum(Mon)|sum(Wed)|
+--------+--------+--------+--------+--------+--------+--------+
|       1|      10|      15|       0|       2|      12|       0|
|       2|       0|       4|      10|       3|       0|       5|
+--------+--------+--------+--------+--------+--------+--------+

如果需要,我会留给您重命名/重新排序列。

于 2016-03-25T07:49:23.400 回答
0

如果您使用 python 下面的代码可能会有所帮助。假设您要转置 spark DataFrame df:

pandas_df = df.toPandas().transpose().reset_index()
transposed_df = sqlContext.createDataFrame(pandas_df)
transposed_df.show()
于 2017-03-21T11:08:57.263 回答
0

考虑一个有 6 列的数据框,我们希望按前 4 列分组,并在 col5 上进行旋转,同时在 col6 上进行聚合(比如求和)。因此,假设您不能使用 spark 1.6 版本,则可以将以下代码(在 spark 1.5 中)编写为:

val pivotedDf = df_to_pivot
    .groupBy(col1,col2,col3,col4)
    .pivot(col5)
    .agg(sum(col6))

这是具有相同输出但没有使用内置数据透视函数的代码:

import scala.collection.SortedMap

//Extracting the col5 distinct values to create the new columns
val distinctCol5Values = df_to_pivot
    .select(col(col5))
    .distinct
    .sort(col5)  // ensure that the output columns are in a consistent logical order
    .map(_.getString(0))
    .toArray
    .toSeq

//Grouping by the data frame to be pivoted on col1-col4
val pivotedAgg = df_to_pivot.rdd
      .groupBy{row=>(row.getString(col1Index),
      row.getDate(col2Index),
      row.getDate(col3Index),
      row.getString(col4Index))}

//Initializing a List of tuple of (String, double values) to be filled in the columns that will be created
val pivotColListTuple = distinctCol5Values.map(ft=> (ft,0.0))
// Using Sorted Map to ensure the order is maintained
var distinctCol5ValuesListMap = SortedMap(pivotColListTuple : _*)
//Pivoting the data on col5 by opening the grouped data
val pivotedRDD = pivotedAgg.map{groupedRow=>
    distinctCol5ValuesListMap = distinctCol5ValuesListMap.map(ft=> (ft._1,0.0))
     groupedRow._2.foreach{row=>
//Updating the distinctCol5ValuesListMap values to reflect the changes
//Change this part accordingly to what you want
       distinctCol5ValuesListMap = distinctCol5ValuesListMap.updated(row.getString(col5Index),
         distinctCol5ValuesListMap.getOrElse(row.getString(col5Index),0.0)+row.getDouble(col6Index))
     }        
    Row.fromSeq(Seq(groupedRow._1._1,groupedRow._1._2,groupedRow._1._3,groupedRow._1._4) ++ distinctCol5ValuesListMap.values.toSeq)
  }

//Consructing the structFields for new columns
val colTypesStruct = distinctCol5ValuesListMap.map(colName=>StructField(colName._1,DoubleType))
//Adding the first four column structFields with the new columns struct
val opStructType = StructType(Seq(StructField(col1Name,StringType),
                                     StructField(col2Name,DateType),
                                     StructField(col3Name,DateType),
                                     StructField(col4Name,StringType)) ++ colTypesStruct )

//Creating the final data frame
val pivotedDF = sqlContext.createDataFrame(pivotedRDD,opStructType)
于 2018-03-21T12:24:53.010 回答