我有一个微妙的 Spark 问题,我无法解决问题。
我们有两个 RDD(来自 Cassandra)。RDD1 包含Actions
,RDD2 包含Historic
数据。两者都有一个可以匹配/加入的 id。但问题是这两个表有一个 N:N 关系。Actions
包含具有相同 id 的多行,Historic
. 以下是两个表中的一些示例日期。
Actions
时间实际上是一个时间戳
id | time | valueX
1 | 12:05 | 500
1 | 12:30 | 500
2 | 12:30 | 125
Historic
set_at 实际上是一个时间戳
id | set_at| valueY
1 | 11:00 | 400
1 | 12:15 | 450
2 | 12:20 | 50
2 | 12:25 | 75
我们如何才能以某种方式连接这两个表,得到这样的结果
1 | 100 # 500 - 400 for Actions#1 with time 12:05 because Historic was in that time at 400
1 | 50 # 500 - 450 for Actions#2 with time 12:30 because H. was in that time at 450
2 | 50 # 125 - 75 for Actions#3 with time 12:30 because H. was in that time at 75
如果不对庞大的数据集进行大量迭代,我就无法提出一个感觉正确的好解决方案。我总是要考虑从Historic
集合中创建一个范围,然后以某种方式检查是否Actions
适合该范围,例如(11:00 - 12:15)来进行计算。但这对我来说似乎很慢。有没有更有效的方法来做到这一点?在我看来,这种问题可能很流行,但我还没有找到任何提示。你将如何在 spark 中解决这个问题?
到目前为止我目前的尝试(一半完成的代码)
case class Historic(id: String, set_at: Long, valueY: Int)
val historicRDD = sc.cassandraTable[Historic](...)
historicRDD
.map( row => ( row.id, row ) )
.reduceByKey(...)
// transforming to another case which results in something like this; code not finished yet
// (List((Range(0, 12:25), 400), (Range(12:25, NOW), 450)))
// From here we could join with Actions
// And then some .filter maybe to select the right Lists tuple