状态可以在窗口过期后的某个时间点由 Beam runner 自动垃圾收集 - 当输入水印超过窗口末尾允许的延迟时,因此所有进一步的输入都是可丢弃的。确切的细节取决于跑步者。
正如您正确确定的那样,全局窗口可能永远不会过期。那么这个状态的自动收集将不会被调用。对于有界数据,包括流失场景,它实际上会过期,但对于永久无界数据源,它不会。
如果您在 Global 窗口中对此类数据进行有状态处理,您可以使用用户定义的计时器(通过@TimerId
、@OnTimer
和TimerSpec
- 我还没有写过关于这些的博客)在您选择的超时后清除状态。如果状态代表某种聚合,那么无论如何您都需要一个计时器来确保您的数据不会滞留在状态中。
这是他们使用的一个简单示例:
new DoFn<Foo, Baz>() {
private static final String MY_TIMER = "my-timer";
private static final String MY_STATE = "my-state";
@StateId(MY_STATE)
private final StateSpec<ValueState<Bizzle>> =
StateSpec.value(Bizzle.coder());
@TimerId(MY_TIMER)
private final TimerSpec myTimer =
TimerSpecs.timer(TimeDomain.EVENT_TIME);
@ProcessElement
public void process(
ProcessContext c,
@StateId(MY_STATE) ValueState<Bizzle> bizzleState,
@TimerId(MY_TIMER) Timer myTimer) {
bizzleState.write(...);
myTimer.setForNowPlus(...);
}
@OnTimer(MY_TIMER)
public void onMyTimer(
OnTimerContext context,
@StateId(MY_STATE) ValueState<Bizzle> bizzleState) {
context.output(... bizzleState.read() ...);
bizzleState.clear();
}
}