14

我有一个微妙的 Spark 问题,我无法解决问题。

我们有两个 RDD(来自 Cassandra)。RDD1 包含Actions,RDD2 包含Historic数据。两者都有一个可以匹配/加入的 id。但问题是这两个表有一个 N:N 关系。Actions包含具有相同 id 的多行,Historic. 以下是两个表中的一些示例日期。

Actions 时间实际上是一个时间戳

id  |  time  | valueX
1   |  12:05 | 500
1   |  12:30 | 500
2   |  12:30 | 125

Historic set_at 实际上是一个时间戳

id  |  set_at| valueY
1   |  11:00 | 400
1   |  12:15 | 450
2   |  12:20 | 50
2   |  12:25 | 75

我们如何才能以某种方式连接这两个表,得到这样的结果

1   |  100  # 500 - 400 for Actions#1 with time 12:05 because Historic was in that time at 400
1   |  50   # 500 - 450 for Actions#2 with time 12:30 because H. was in that time at 450
2   |  50   # 125 - 75  for Actions#3 with time 12:30 because H. was in that time at 75

如果不对庞大的数据集进行大量迭代,我就无法提出一个感觉正确的好解决方案。我总是要考虑从Historic集合中创建一个范围,然后以某种方式检查是否Actions适合该范围,例如(11:00 - 12:15)来进行计算。但这对我来说似乎很慢。有没有更有效的方法来做到这一点?在我看来,这种问题可能很流行,但我还没有找到任何提示。你将如何在 spark 中解决这个问题?

到目前为止我目前的尝试(一半完成的代码)

case class Historic(id: String, set_at: Long, valueY: Int)
val historicRDD = sc.cassandraTable[Historic](...)

historicRDD
.map( row => ( row.id, row ) )
.reduceByKey(...) 
// transforming to another case which results in something like this; code not finished yet
// (List((Range(0, 12:25), 400), (Range(12:25, NOW), 450)))

// From here we could join with Actions
// And then some .filter maybe to select the right Lists tuple
4

3 回答 3

4

这是一个有趣的问题。我还花了一些时间想出一种方法。这就是我想出的:

给定案例类Action(id, time, x)Historic(id, time, y)

  • 加入带有历史记录的动作(这可能很重)
  • 过滤与给定操作无关的所有历史数据
  • 通过 (id,time) 键入结果 - 在不同时间区分相同的键
  • 通过操作将历史记录减少到最大值,为我们留下给定操作的相关历史记录

在 Spark 中:

val actionById = actions.keyBy(_.id)
val historyById = historic.keyBy(_.id)
val actionByHistory = actionById.join(historyById)
val filteredActionByidTime = actionByHistory.collect{ case (k,(action,historic)) if (action.time>historic.t) => ((action.id, action.time),(action,historic))}
val topHistoricByAction = filteredActionByidTime.reduceByKey{ case ((a1:Action,h1:Historic),(a2:Action, h2:Historic)) =>  (a1, if (h1.t>h2.t) h1 else h2)}

// we are done, let's produce a report now
val report = topHistoricByAction.map{case ((id,time),(action,historic)) => (id,time,action.X -historic.y)}

使用上面提供的数据,报告如下所示:

report.collect
Array[(Int, Long, Int)] = Array((1,43500,100), (1,45000,50), (2,45000,50))

(我将时间转换为秒以获得简单的时间戳)

于 2014-11-27T00:52:57.877 回答
0

经过几个小时的思考、尝试和失败,我想出了这个解决方案。我不确定这是否有任何好处,但由于缺乏其他选择,这是我的解决方案。

首先我们扩展我们的case class Historic

case class Historic(id: String, set_at: Long, valueY: Int) {
  val set_at_map = new java.util.TreeMap[Long, Int]() // as it seems Scala doesn't provides something like this with similar operations we'll need a few lines later
  set_at_map.put(0, valueY) // Means from the beginning of Epoch ...
  set_at_map.put(set_at, valueY) // .. to the set_at date

  // This is the fun part. With .getHistoricValue we can pass any timestamp and we will get the a value of the key back that contains the passed date. For more information look at this answer: http://stackoverflow.com/a/13400317/1209327
  def getHistoricValue(date: Long) : Option[Int] = {
    var e = set_at_map.floorEntry(date)                                   
    if (e != null && e.getValue == null) {                                  
      e = set_at_map.lowerEntry(date)                                     
    }                                                                         
    if ( e == null ) None else e.getValue()
  }
}

案例课程已准备就绪,现在我们将其付诸实施

val historicRDD = sc.cassandraTable[Historic](...)
  .map( row => ( row.id, row ) )
  .reduceByKey( (row1, row2) =>  {
    row1.set_at_map.put(row2.set_at, row2.valueY) // we add the historic Events up to each id
    row1
  })

// Now we load the Actions and map it by id as we did with Historic
val actionsRDD = sc.cassandraTable[Actions](...)
  .map( row => ( row.id, row ) )

// Now both RDDs have the same key and we can join them
val fin = actionsRDD.join(historicRDD)
  .map( row => {
    ( row._1.id, 
      (
        row._2._1.id, 
        row._2._1.valueX - row._2._2.getHistoricValue(row._2._1.time).get // returns valueY for that timestamp
      )
    )
  })

我对 Scala 完全陌生,所以如果我们可以在某个地方改进此代码,请告诉我。

于 2014-11-26T15:45:39.540 回答
0

我知道这个问题已经得到解答,但我想添加另一个对我有用的解决方案 -

你的数据——

Actions 
id  |  time  | valueX
1   |  12:05 | 500
1   |  12:30 | 500
2   |  12:30 | 125

Historic 
id  |  set_at| valueY
1   |  11:00 | 400
1   |  12:15 | 450
2   |  12:20 | 50
2   |  12:25 | 75
  1. 工会ActionsHistoric
    结合
    编号 | 时间 | 值X | 记录类型
    1 | 12:05 | 500 | 行动
    1 | 12:30 | 500 | 行动
    2 | 12:30 | 125 | 行动
    1 | 11:00 | 400 | 历史性
    1 | 12:15 | 450 | 历史性
    2 | 12:20 | 50 | 历史性
    2 | 12:25 | 75 | 历史性
  1. 编写一个自定义分区器并使用repartitionAndSortWithinPartitions进行分区id,但排序为time

    分区 1
    1 | 11:00 | 400 | 历史性
    1 | 12:05 | 500 | 行动
    1 | 12:15 | 450 | 历史性
    1 | 12:30 | 500 | 行动
    分区 2
    2 | 12:20 | 50 | 历史性
    2 | 12:25 | 75 | 历史性
    2 | 12:30 | 125 | 行动
    

  2. 遍历每个分区的记录。

如果它是一条记录,请将其添加到映射中,或者如果它已经具有该 ID,则更新映射 -使用每个分区的映射来Historical跟踪最新的valueY每个。id

如果是Action记录,valueY则从地图中获取值并从中减去valueX

一张地图M

Partition-1 traversal in order M={ 1 -> 400} // A new entry in map M 1 | 100 // M(1) = 400; 500-400 M={1 -> 450} // update M, because key already exists 1 | 50 // M(1) Partition-2 traversal in order M={ 2 -> 50} // A new entry in M M={ 2 -> 75} // update M, because key already exists 2 | 50 // M(2) = 75; 125-75

您可以尝试分区和排序time,但您需要稍后合并分区。这可能会增加一些复杂性。

这一点,我发现它比我们在使用时间范围加入时通常得到的多对多加入更可取。

于 2017-03-25T08:26:40.383 回答