2

我正在寻找在 Spark DStream 中累积最后 N 条消息的最佳解决方案。我还想指定要保留的消息数。

例如,给定以下流,我想保留最后 3 个元素:

Iteration  New message  Downstream
1          A            [A]
2          B            [A, B]
3          C            [A, B, C]
4          D            [B, C, D]

到目前为止,我正在研究 DStream 上的以下方法:

  1. updateStateByKey:鉴于所有消息都具有相同的密钥,我可以这样做。但看起来有点奇怪,为什么这需要知道密钥。
  2. mapWithState:Scala 中的 API 对于这么简单的事情来说太乏味了
  3. window: 似乎没有做这项工作,它还需要一个时间值来进行窗口化,而不是最后一个元素数
  4. 累加器:尚未真正使用Spark 文档中的累加器

实现这一目标的最佳解决方案是什么?

4

1 回答 1

1

mapWithState正是您所需要的,而且绝对不会太乏味:

case class Message(x: String)
def statefulTransformation(key: Int,
                           value: Option[Message],
                           state: State[mutable.MutableList[Message]]): Option[Message] = {
  def updateState(value: Message): Message = {
    val updatedList =
      state
        .getOption()
        .map(list => if (list.size > 3) list.drop(1) :+ value else list :+ value)
      .getOrElse(mutable.MutableList(value))

    state.update(updatedList)
    value
  }

  value.map(updateState)
}

现在你需要的是:

val stateSpec = StateSpec.function(statefulTransformation _)
dStream.mapWithState(stateSpec)

旁注 - 我用于mutable.MutableList恒定时间追加。

于 2016-08-16T07:03:01.500 回答