10

我的要求是在一天内流式传输数百万条记录,并且它对外部配置参数有很大的依赖性。例如,用户可以随时在 Web 应用程序中更改所需的设置,并且在进行更改后,必须使用新的应用程序配置参数进行流式传输。这些是应用程序级别的配置,我们还有一些动态排除参数,每个数据都必须通过和过滤。

我看到 flink 没有在所有任务管理器和子任务之间共享的全局状态。拥有一个集中式缓存是一种选择,但对于每个参数,我都必须从缓存中读取它,这会增加延迟。请就处理此类场景的更好方法以及其他应用程序如何处理它提出建议。谢谢。

4

1 回答 1

8

更新正在运行的流应用程序的配置是常见的要求。在 Flink 的 DataStream API 中,这可以使用所谓的CoFlatMapFunction处理两个输入流的方法来完成。其中一个流可以是数据流,另一个可以是控制流。

以下示例展示了如何动态调整过滤掉超过特定长度的字符串的用户函数。

val data: DataStream[String] = ???
val control: DataStream[Int] = ???

val filtered: DataStream[String] = data
  // broadcast all control messages to the following CoFlatMap subtasks
  .connect(control.broadcast)
  // process data and control messages
  .flatMap(new DynLengthFilter)


class DynLengthFilter extends CoFlatMapFunction[String, Int, String] with Checkpointed[Int] {

  var length = 0

  // filter strings by length
  override def flatMap1(value: String, out: Collector[String]): Unit = {
    if (value.length < length) {
      out.collect(value)
    }
  }

  // receive new filter length
  override def flatMap2(value: Int, out: Collector[String]): Unit = {
    length = value
  }

  override def snapshotState(checkpointId: Long, checkpointTimestamp: Long): Int = length

  override def restoreState(state: Int): Unit = {
    length = state
  }
}

DynLengthFilter用户函数实现了过滤器长度的Checkpointed接口。如果发生故障,此信息会自动恢复。

于 2016-09-26T09:53:58.580 回答