我有一个微妙的 Spark 问题,我无法解决问题。

我们有两个 RDD(来自 Cassandra)。RDD1 包含Actions,RDD2 包含Historic数据。两者都有一个可以匹配/加入的 id。但问题是这两个表有一个 N:N 关系。Actions包含具有相同 id 的多行,Historic. 以下是两个表中的一些示例日期。

Actions 时间实际上是一个时间戳

id  |  time  | valueX
1   |  12:05 | 500
1   |  12:30 | 500
2   |  12:30 | 125

Historic set_at 实际上是一个时间戳

id  |  set_at| valueY
1   |  11:00 | 400
1   |  12:15 | 450
2   |  12:20 | 50
2   |  12:25 | 75


1   |  100  # 500 - 400 for Actions#1 with time 12:05 because Historic was in that time at 400
1   |  50   # 500 - 450 for Actions#2 with time 12:30 because H. was in that time at 450
2   |  50   # 125 - 75  for Actions#3 with time 12:30 because H. was in that time at 75

如果不对庞大的数据集进行大量迭代,我就无法提出一个感觉正确的好解决方案。我总是要考虑从Historic集合中创建一个范围,然后以某种方式检查是否Actions适合该范围,例如(11:00 - 12:15)来进行计算。但这对我来说似乎很慢。有没有更有效的方法来做到这一点?在我看来,这种问题可能很流行,但我还没有找到任何提示。你将如何在 spark 中解决这个问题?


case class Historic(id: String, set_at: Long, valueY: Int)
val historicRDD = sc.cassandraTable[Historic](...)

.map( row => ( row.id, row ) )
// transforming to another case which results in something like this; code not finished yet
// (List((Range(0, 12:25), 400), (Range(12:25, NOW), 450)))

// From here we could join with Actions
// And then some .filter maybe to select the right Lists tuple

3 回答 3



给定案例类Action(id, time, x)Historic(id, time, y)

  • 加入带有历史记录的动作(这可能很重)
  • 过滤与给定操作无关的所有历史数据
  • 通过 (id,time) 键入结果 - 在不同时间区分相同的键
  • 通过操作将历史记录减少到最大值,为我们留下给定操作的相关历史记录

在 Spark 中:

val actionById = actions.keyBy(_.id)
val historyById = historic.keyBy(_.id)
val actionByHistory = actionById.join(historyById)
val filteredActionByidTime = actionByHistory.collect{ case (k,(action,historic)) if (action.time>historic.t) => ((action.id, action.time),(action,historic))}
val topHistoricByAction = filteredActionByidTime.reduceByKey{ case ((a1:Action,h1:Historic),(a2:Action, h2:Historic)) =>  (a1, if (h1.t>h2.t) h1 else h2)}

// we are done, let's produce a report now
val report = topHistoricByAction.map{case ((id,time),(action,historic)) => (id,time,action.X -historic.y)}


Array[(Int, Long, Int)] = Array((1,43500,100), (1,45000,50), (2,45000,50))


于 2014-11-27T00:52:57.877 回答


首先我们扩展我们的case class Historic

case class Historic(id: String, set_at: Long, valueY: Int) {
  val set_at_map = new java.util.TreeMap[Long, Int]() // as it seems Scala doesn't provides something like this with similar operations we'll need a few lines later
  set_at_map.put(0, valueY) // Means from the beginning of Epoch ...
  set_at_map.put(set_at, valueY) // .. to the set_at date

  // This is the fun part. With .getHistoricValue we can pass any timestamp and we will get the a value of the key back that contains the passed date. For more information look at this answer: http://stackoverflow.com/a/13400317/1209327
  def getHistoricValue(date: Long) : Option[Int] = {
    var e = set_at_map.floorEntry(date)                                   
    if (e != null && e.getValue == null) {                                  
      e = set_at_map.lowerEntry(date)                                     
    if ( e == null ) None else e.getValue()


val historicRDD = sc.cassandraTable[Historic](...)
  .map( row => ( row.id, row ) )
  .reduceByKey( (row1, row2) =>  {
    row1.set_at_map.put(row2.set_at, row2.valueY) // we add the historic Events up to each id

// Now we load the Actions and map it by id as we did with Historic
val actionsRDD = sc.cassandraTable[Actions](...)
  .map( row => ( row.id, row ) )

// Now both RDDs have the same key and we can join them
val fin = actionsRDD.join(historicRDD)
  .map( row => {
    ( row._1.id, 
        row._2._1.valueX - row._2._2.getHistoricValue(row._2._1.time).get // returns valueY for that timestamp

我对 Scala 完全陌生,所以如果我们可以在某个地方改进此代码,请告诉我。

于 2014-11-26T15:45:39.540 回答

我知道这个问题已经得到解答,但我想添加另一个对我有用的解决方案 -


id  |  time  | valueX
1   |  12:05 | 500
1   |  12:30 | 500
2   |  12:30 | 125

id  |  set_at| valueY
1   |  11:00 | 400
1   |  12:15 | 450
2   |  12:20 | 50
2   |  12:25 | 75
  1. 工会ActionsHistoric
    编号 | 时间 | 值X | 记录类型
    1 | 12:05 | 500 | 行动
    1 | 12:30 | 500 | 行动
    2 | 12:30 | 125 | 行动
    1 | 11:00 | 400 | 历史性
    1 | 12:15 | 450 | 历史性
    2 | 12:20 | 50 | 历史性
    2 | 12:25 | 75 | 历史性
  1. 编写一个自定义分区器并使用repartitionAndSortWithinPartitions进行分区id,但排序为time

    分区 1
    1 | 11:00 | 400 | 历史性
    1 | 12:05 | 500 | 行动
    1 | 12:15 | 450 | 历史性
    1 | 12:30 | 500 | 行动
    分区 2
    2 | 12:20 | 50 | 历史性
    2 | 12:25 | 75 | 历史性
    2 | 12:30 | 125 | 行动

  2. 遍历每个分区的记录。

如果它是一条记录,请将其添加到映射中,或者如果它已经具有该 ID,则更新映射 -使用每个分区的映射来Historical跟踪最新的valueY每个。id



Partition-1 traversal in order M={ 1 -> 400} // A new entry in map M 1 | 100 // M(1) = 400; 500-400 M={1 -> 450} // update M, because key already exists 1 | 50 // M(1) Partition-2 traversal in order M={ 2 -> 50} // A new entry in M M={ 2 -> 75} // update M, because key already exists 2 | 50 // M(2) = 75; 125-75



于 2017-03-25T08:26:40.383 回答