我不确定你想用 while 循环做什么。无论如何,您可以使用 REPL 检查您用作条件的表达式是 aColumn
而不是 a Boolean
,因此是异常。
> size(col("categoriesRaw")) !== 0
res1: org.apache.spark.sql.Column = (NOT (size(categoriesRaw) = 0))
基本上,这是一个需要由 SparkSQL 在 awhere
或select
任何其他使用 Columns 的函数中求值的表达式。
不过,使用您的 spark 代码,您几乎就在那里,您只需要添加一个groupBy
即可到达您想要的位置。让我们从创建数据开始。
import spark.implicits._
val users = Seq( "user 1" -> Map("home & personal items > interior" -> 1,
"vehicles > cars" -> 1),
"user 2" -> Map("vehicles > cars" -> 3))
val df = users.toDF("user", "categoriesRaw")
然后,您不需要 while 循环来遍历映射的所有值。explode
为你做的正是:
val explodedDf = df.select( explode('categoriesRaw) )
explodedDf.show(false)
+--------------------------------+-----+
|key |value|
+--------------------------------+-----+
|home & personal items > interior|1 |
|vehicles > cars |1 |
|vehicles > cars |3 |
+--------------------------------+-----+
最后,你可以使用 groupBy add 得到你想要的。
explodedDf
.select('key as "categ", 'value as "number_of_events")
.groupBy("categ")
.agg(count('*), sum('number_of_events))
.show(false)
+--------------------------------+--------+---------------------+
|categ |count(1)|sum(number_of_events)|
+--------------------------------+--------+---------------------+
|home & personal items > interior|1 |1 |
|vehicles > cars |2 |4 |
+--------------------------------+--------+---------------------+
注意:我不确定您是要计算会话(第一列)还是事件(第二列),所以我计算了两者。