0

我有一个带有不同键的消息流。对于每个键,我想创建一个事件时间会话窗口并仅在以下情况下对其进行一些处理:

  • MIN_EVENTS窗口中累积的事件数(本质上是键控状态)

对于每个键,MIN_EVENTS都是不同的,并且可能在运行时发生变化。我很难实现这一点。特别是,我正在实现这个逻辑,如下所示:

        inputStream.keyBy(key).
        window(EventTimeSessionWindow(INACTIVITY_PERIOD).
        trigger(new MyCustomCountTrigger()).
        apply(new MyProcessFn())

我正在尝试创建一个MyCustomCountTrigger()应该能够从状态存储中读取的自定义,例如MapState<String, Integer> stateStore映射key到它的MIN_EVENTS参数。我知道我可以使用TriggerContext ctx所有触发器都可用的对象访问状态存储。

如何从 CountTrigger() 类外部初始化此状态存储?我还没有找到这样做的例子。

4

1 回答 1

0

您可以根据发送到 Trigger 类的构造函数的参数初始化状态。但是您不能从该类之外访问该状态。

如果您需要更大的灵活性,我建议您使用进程函数而不是窗口。

于 2020-02-03T07:21:42.153 回答