我有一个从 spark 数据框生成的事件列表,如下所示。我正在使用带有 Scala 的 Spark 2.2.0。
val events = df.select($"event", hour($"time") as "hour", to_date($"time", "yyyy-MM-dd") as "day")
+-----+-----+----------+
|event|hour | day|
+-----+-----+----------+
|event1| 18|2015-02-05|
|event1| 17|2015-02-19|
|event5| 18|2015-02-02|
|event5| 19|2015-02-02|
|event1| 1|2015-03-17|
|event1| 0|2015-02-03|
|event1| 20|2015-02-02|
|event1| 22|2015-02-02|
|event1| 23|2015-02-02|
|event1| 18|2015-02-09|
|event1| 19|2015-02-09|
|event1| 21|2015-02-09|
|event1| 21|2015-04-06|
|event1| 23|2015-02-09|
|event1| 20|2015-02-16|
|event2| 19|2015-02-12|
|event3| 18|2015-02-18|
|event1| 22|2015-02-16|
|event2| 17|2015-02-04|
|event1| 23|2015-02-16|
+-----+----+----------+
only showing top 20 rows
我需要创建每小时存储桶并计算每小时发生的事件数。所以我的方法是创建存储桶(其中 24 个)并计算其特定时间段中的事件,如下所示。
val splits = (0 to 24).map(_ * 1.0).toArray
val bucketizer = new Bucketizer()
.setInputCol("hour")
.setOutputCol("bucket")
.setSplits(splits)
val bucket = bucketizer.transform(events)
val result = bucket.groupBy($"day", $"bucket").agg(count($"event").as("count")).orderBy(asc("bucket"))
result.filter($"day" === "2015-05-21").orderBy(asc("bucket")).show()
上面代码的结果是
+----------+------+-----+
| day|bucket|count|
+----------+------+-----+
|2015-05-21| 0.0| 1|
|2015-05-21| 2.0| 1|
|2015-05-21| 11.0| 1|
|2015-05-21| 17.0| 1|
|2015-05-21| 18.0| 4|
|2015-05-21| 19.0| 4|
|2015-05-21| 21.0| 1|
|2015-05-21| 22.0| 3|
|2015-05-21| 23.0| 1|
+----------+------+-----+
哪个是对的。然而,我所期望的输出是这样的:
+----------+------+-----+
| day|bucket|count|
+----------+------+-----+
|2015-05-21| 0.0| 1|
|2015-05-21| 1.0| 0|
|2015-05-21| 2.0| 1|
|2015-05-21| 3.0| 0|
|2015-05-21| 4.0| 0|
|2015-05-21| 5.0| 0|
:
:
|2015-05-21| 11.0| 1|
|2015-05-21| 12.0| 0|
|2015-05-21| 13.0| 0|
:
:
|2015-05-21| 17.0| 1|
|2015-05-21| 18.0| 4|
|2015-05-21| 19.0| 4|
|2015-05-21| 20.0| 0|
|2015-05-21| 21.0| 1|
|2015-05-21| 22.0| 3|
|2015-05-21| 23.0| 1|
+----------+------+-----+
基本上,没有事件的箱(桶)应该填充 0。知道如何实现吗?
谢谢!