目前,我们在 Kafka Streams 应用程序中使用以下内容:
streamsBuilder.table(inputTopic)
.join(...)
.mapValues(valueMapper) // <-- this causes another state store
.groupBy(...)
.aggregate(...)
.mapValues(...)
[...]
.toStream()
.to(outputTopic)
我刚刚意识到,加入后的 mapValues 创建了一个额外的状态存储。
如果 valueMapper 中的计算在某种程度上是微不足道的(例如删除对象中的字段等),那么避免额外的 statestore 不是更好吗?我是否需要转换为 KStream 并使用 KStream.mapValues 来避免 stateStore,即
streamsBuilder.table(inputTopic)
.join(...)
.toStream
.mapValues(valueMapper) // <-- no more additional statestore
.groupBy(...)
.aggregate(...)
.mapValues(...)
[...]
.toStream()
.to(outputTopic)
还是有更好的选择来在加入后应用附加映射?