3

我想创建一个Map计算出现次数的列。

例如:

+---+----+
|  b|   a|
+---+----+
|  1|   b|
|  2|null|
|  1|   a|
|  1|   a|
+---+----+

会导致

+---+--------------------+
|  b|                 res|
+---+--------------------+
|  1|[a -> 2.0, b -> 1.0]|
|  2|                  []|
+---+--------------------+

目前,在 Spark 2.4.6 中,我能够使用 udaf 实现它。

在碰到 Spark3 时,我想知道是否可以摆脱这个 udaf(我尝试使用新方法aggregate但没有成功)

有没有一种有效的方法来做到这一点?(对于效率部分,我可以轻松测试)

4

4 回答 4

3

这里有一个 Spark 3 解决方案:

import org.apache.spark.sql.functions._

df.groupBy($"b",$"a").count()
  .groupBy($"b")
  .agg(
    map_from_entries(
      collect_list(
        when($"a".isNotNull,struct($"a",$"count"))
      )
    ).as("res")
  )
  .show()

给出:

+---+----------------+
|  b|             res|
+---+----------------+
|  1|[b -> 1, a -> 2]|
|  2|              []|
+---+----------------+

这里使用的解决方案Aggregator

import org.apache.spark.sql.catalyst.encoders.ExpressionEncoder
import org.apache.spark.sql.expressions.Aggregator
import org.apache.spark.sql.functions._
import org.apache.spark.sql.Encoder

val countOcc = new Aggregator[String, Map[String,Int], Map[String,Int]] with Serializable {
    def zero: Map[String,Int] = Map.empty.withDefaultValue(0)
    def reduce(b: Map[String,Int], a: String) = if(a!=null) b + (a -> (b(a) + 1)) else b
    def merge(b1: Map[String,Int], b2: Map[String,Int]) = {
      val keys = b1.keys.toSet.union(b2.keys.toSet)
      keys.map{ k => (k -> (b1(k) + b2(k))) }.toMap
    }
    def finish(b: Map[String,Int]) = b
    def bufferEncoder: Encoder[Map[String,Int]] = implicitly(ExpressionEncoder[Map[String,Int]])
    def outputEncoder: Encoder[Map[String, Int]] = implicitly(ExpressionEncoder[Map[String, Int]])
}

val countOccUDAF = udaf(countOcc)

df
  .groupBy($"b")
  .agg(countOccUDAF($"a").as("res"))
  .show()

给出:

+---+----------------+
|  b|             res|
+---+----------------+
|  1|[b -> 1, a -> 2]|
|  2|              []|
+---+----------------+
于 2020-10-13T19:23:20.093 回答
2

您始终可以collect_list与 UDF 一起使用,但前提是您的分组不太大:

val udf_histo = udf((x:Seq[String]) => x.groupBy(identity).mapValues(_.size))

df.groupBy($"b")
  .agg(
    collect_list($"a").as("as")
  )
  .select($"b",udf_histo($"as").as("res"))
  .show()

给出:

+---+----------------+
|  b|             res|
+---+----------------+
|  1|[b -> 1, a -> 2]|
|  2|              []|
+---+----------------+

这应该比 UDAF 更快:Spark 自定义聚合:collect_list+UDF vs UDAF

于 2020-10-13T19:30:06.293 回答
1

我们可以实现这是 spark 2.4

//GET THE COUNTS
val groupedCountDf = originalDf.groupBy("b","a").count

//CREATE MAPS FOR EVERY COUNT | EMPTY MAP FOR NULL KEY
//AGGREGATE THEM AS ARRAY 

val dfWithArrayOfMaps =  groupedCountDf
.withColumn("newMap",  when($"a".isNotNull, map($"a",$"count")).otherwise(map()))
.groupBy("b").agg(collect_list($"newMap") as "multimap")

//EXPRESSION TO CONVERT ARRAY[MAP] -> MAP

val mapConcatExpr = expr("aggregate(multimap, map(), (k, v) -> map_concat(k, v))")

val finalDf = dfWithArrayOfMaps.select($"b", mapConcatExpr.as("merged_data"))
于 2020-10-13T17:24:17.160 回答
1

这是一个具有单个groupBy且稍微复杂的 sql 表达式的解决方案。此解决方案适用于 Spark 2.4+

df.groupBy("b")
  .agg(expr("sort_array(collect_set(a)) as set"),
       expr("sort_array(collect_list(a)) as list"))
  .withColumn("res",
       expr("map_from_arrays(set,transform(set, x -> size(filter(list, y -> y=x))))"))
  .show()

输出:

+---+------+---------+----------------+
|  b|   set|     list|             res|
+---+------+---------+----------------+
|  1|[a, b]|[a, a, b]|[a -> 2, b -> 1]|
|  2|    []|       []|              []|
+---+------+---------+----------------+

这个想法是从列中收集数据a两次:一次进入集合,一次进入列表。然后在集合中每个元素的变换的帮助下,计算列表中特定元素的出现次数。最后,将集合和元素的数量与map_from_arrays结合起来。

但是我不能说这种方法是否真的比 UDAF 快。

于 2020-10-13T18:48:31.957 回答