1

我需要重新排序来自两个主题的数据(使用外连接合并)。StateStore使用 a保持最新序列并使用重新排序的消息修改下游流值是否是一种好习惯。

简化问题:

(来自主题 A 的序列,来自主题 B 的序列)-> 要输出的新序列(将当前序列保留在 中StateStore

(10,100) -> 1

(11,101) -> 2

(12,102) -> 3

(...,...) -> ...

新序列将存储为 stateStore 中键“currentSeq”的值。该序列将在每条消息上递增并存储回 stateStore。

4

1 回答 1

2

您应该使用具有已注册(可能是自定义)状态的处理器 API。

您还可以使用 DSL 将处理器 API 与 DSL 混合匹配process()transform()或者transformValue()引用状态存储(按名称)。

于 2016-11-28T20:44:02.547 回答