2

因为我是 Spark/Scala 的新手,所以我正在处理我的第一个问题,我想知道是否有一种简单的方法可以通过 Spark 本地操作和 scala 方法来处理它们,因为也许我遗漏了一些东西。

假设我有一个文本文件日志(以前排序),每行代表人|时间

Peter|5:45 Peter|6:05 Peter|6:27 Peter|6:58 Peter|6:59 Peter|7:59 Mark|7:40 Mark|7:55 Mark|8:30

现在我可以创建一个存储人员和时间的 RDD:

val file = sc.textFile(logFile, 2).cache()
val log = file.map(x => (x.split('|').lift(0).get,x.split('|').lift(1).get))

我想要的是一个新值(间隔),表示连续记录(对于同一个人)属于相同的时间间隔,如果它们的差异小于 30 分钟。

在下表中,您可以看到:

  • 记录 #1 和 #2 相差 20 分钟,因此它们处于相同的区间
  • 记录 #2 和 #3 相差 22 分钟,因此它们处于相同的间隔
  • 记录 #3 和 #4 相差 31 分钟,因此它们不在同一时间间隔内(使 #1 #2 #3 成为第一个时间间隔,而 #4 成为下一个时间间隔的第一条记录)

#   Person   Time   "Lag"     Diff      Note                   Interval
1   Peter    5:45   -         -         first row Peter        Peter started at 5:45
2   Peter    6:05   5:45      20        <30 mins               Peter started at 5:45
3   Peter    6:27   6:05      22        <30 mins               Peter started at 5:45
4   Peter    6:58   6:27      31        >30 mins               Peter started at 6:58
5   Peter    6:59   6:58      1         <30 mins               Peter started at 6:58
6   Peter    7:59   6:59      60        >30 mins               Peter started at 7:59
7   Mark     7:40   -         -         first row Mark         Mark started at 7:40
8   Mark     7:55   7:40      15        <30 mins               Mark started at 7:40
9   Mark     8:30   7:55      35        >30 mins               Mark started at 8:30

所以我的RDD应该是这样的:

 Person   Time   Interval
 Peter    5:45   Peter@5:45
 Peter    6:05   Peter@5:45
 Peter    6:27   Peter@5:45
 Peter    6:58   Peter@6:58
 Peter    6:59   Peter@6:58
 Peter    7:59   Peter@7:59
 Mark     7:40   Mark@7:40
 Mark     7:55   Mark@7:40
 Mark     8:30   Mark@8:30

这是否可以通过合理的努力实现,或者我是否从 Spark 本机操作与 Scala 方法相结合中假装太多?

也许使用 Spark-SQL 来操作我的数据集可能会更容易,因为我更熟悉 SQL,但我也想熟悉 Spark 核心。

每一个建议、提示、代码示例都将不胜感激。

提前致谢

法郎

4

0 回答 0