因为我是 Spark/Scala 的新手,所以我正在处理我的第一个问题,我想知道是否有一种简单的方法可以通过 Spark 本地操作和 scala 方法来处理它们,因为也许我遗漏了一些东西。
假设我有一个文本文件日志(以前排序),每行代表人|时间
Peter|5:45
Peter|6:05
Peter|6:27
Peter|6:58
Peter|6:59
Peter|7:59
Mark|7:40
Mark|7:55
Mark|8:30
现在我可以创建一个存储人员和时间的 RDD:
val file = sc.textFile(logFile, 2).cache()
val log = file.map(x => (x.split('|').lift(0).get,x.split('|').lift(1).get))
我想要的是一个新值(间隔),表示连续记录(对于同一个人)属于相同的时间间隔,如果它们的差异小于 30 分钟。
在下表中,您可以看到:
- 记录 #1 和 #2 相差 20 分钟,因此它们处于相同的区间
- 记录 #2 和 #3 相差 22 分钟,因此它们处于相同的间隔
- 记录 #3 和 #4 相差 31 分钟,因此它们不在同一时间间隔内(使 #1 #2 #3 成为第一个时间间隔,而 #4 成为下一个时间间隔的第一条记录)
# Person Time "Lag" Diff Note Interval
1 Peter 5:45 - - first row Peter Peter started at 5:45
2 Peter 6:05 5:45 20 <30 mins Peter started at 5:45
3 Peter 6:27 6:05 22 <30 mins Peter started at 5:45
4 Peter 6:58 6:27 31 >30 mins Peter started at 6:58
5 Peter 6:59 6:58 1 <30 mins Peter started at 6:58
6 Peter 7:59 6:59 60 >30 mins Peter started at 7:59
7 Mark 7:40 - - first row Mark Mark started at 7:40
8 Mark 7:55 7:40 15 <30 mins Mark started at 7:40
9 Mark 8:30 7:55 35 >30 mins Mark started at 8:30
所以我的RDD应该是这样的:
Person Time Interval
Peter 5:45 Peter@5:45
Peter 6:05 Peter@5:45
Peter 6:27 Peter@5:45
Peter 6:58 Peter@6:58
Peter 6:59 Peter@6:58
Peter 7:59 Peter@7:59
Mark 7:40 Mark@7:40
Mark 7:55 Mark@7:40
Mark 8:30 Mark@8:30
这是否可以通过合理的努力实现,或者我是否从 Spark 本机操作与 Scala 方法相结合中假装太多?
也许使用 Spark-SQL 来操作我的数据集可能会更容易,因为我更熟悉 SQL,但我也想熟悉 Spark 核心。
每一个建议、提示、代码示例都将不胜感激。
提前致谢
法郎