我有一个带有不同键的消息流。对于每个键,我想创建一个事件时间会话窗口并仅在以下情况下对其进行一些处理:
MIN_EVENTS
窗口中累积的事件数(本质上是键控状态)
对于每个键,MIN_EVENTS
都是不同的,并且可能在运行时发生变化。我很难实现这一点。特别是,我正在实现这个逻辑,如下所示:
inputStream.keyBy(key).
window(EventTimeSessionWindow(INACTIVITY_PERIOD).
trigger(new MyCustomCountTrigger()).
apply(new MyProcessFn())
我正在尝试创建一个MyCustomCountTrigger()
应该能够从状态存储中读取的自定义,例如MapState<String, Integer> stateStore
映射key
到它的MIN_EVENTS
参数。我知道我可以使用TriggerContext ctx
所有触发器都可用的对象访问状态存储。
如何从 CountTrigger() 类外部初始化此状态存储?我还没有找到这样做的例子。