3

Apache Beam 最近通过注释引入了状态单元StateSpec@StateId部分支持 Apache Flink 和 Google Cloud Dataflow。

我找不到任何有关将其与GlobalWindow. 特别是,有没有办法有一个“状态垃圾收集”机制来摆脱根据某些配置一段时间未见的键的状态,同时仍然保持键的单一历史状态是见过的经常够吗?

或者,在这种情况下使用的状态量是否会发散,无法回收与一段时间未见的键对应的状态?

我还对 Apache Flink 或 Google Cloud Dataflow 是否支持潜在的解决方案感兴趣。

Flink 和直接运行器似乎有一些用于“状态 GC”的代码,但我不确定它的作用以及在使用全局窗口时是否相关。

4

2 回答 2

4

状态可以在窗口过期后的某个时间点由 Beam runner 自动垃圾收集 - 当输入水印超过窗口末尾允许的延迟时,因此所有进一步的输入都是可丢弃的。确切的细节取决于跑步者。

正如您正确确定的那样,全局窗口可能永远不会过期。那么这个状态的自动收集将不会被调用。对于有界数据,包括流失场景,它实际上会过期,但对于永久无界数据源,它不会。

如果您在 Global 窗口中对此类数据进行有状态处理,您可以使用用户定义的计时器(通过@TimerId@OnTimerTimerSpec- 我还没有写过关于这些的博客)在您选择的超时后清除状态。如果状态代表某种聚合,那么无论如何您都需要一个计时器来确保您的数据不会滞留在状态中。

这是他们使用的一个简单示例:

new DoFn<Foo, Baz>() {

  private static final String MY_TIMER = "my-timer";
  private static final String MY_STATE = "my-state";

  @StateId(MY_STATE)
  private final StateSpec<ValueState<Bizzle>> =
      StateSpec.value(Bizzle.coder());

  @TimerId(MY_TIMER)
  private final TimerSpec myTimer =
      TimerSpecs.timer(TimeDomain.EVENT_TIME);

  @ProcessElement
  public void process(
      ProcessContext c,
      @StateId(MY_STATE) ValueState<Bizzle> bizzleState,
      @TimerId(MY_TIMER) Timer myTimer) {
    bizzleState.write(...);
    myTimer.setForNowPlus(...);
  }

  @OnTimer(MY_TIMER)
  public void onMyTimer(
      OnTimerContext context,
      @StateId(MY_STATE) ValueState<Bizzle> bizzleState) {
    context.output(... bizzleState.read() ...);
    bizzleState.clear();
  }
}
于 2017-03-13T22:08:45.877 回答
1

如果您使用GlobalWindows. 仅当您使用某些非全局窗口时,才会在水印通过窗口末尾加上允许的延迟后进行垃圾收集。

如果您必须使用,您可以做的GlobalWindows是手动将last update timestamp. 然后,您将定期设置一个计时器,您可以在其中根据当前时间检查此时间戳,并在必要时删除状态。@OnTimer您将在第一次遇到键时设置此计时器(您可以从缺少时间戳状态中看到),然后在方法中重新设置它。

于 2017-03-13T21:15:00.130 回答