1

目前,我们在 Kafka Streams 应用程序中使用以下内容:

streamsBuilder.table(inputTopic)
              .join(...)
              .mapValues(valueMapper) // <-- this causes another state store
              .groupBy(...)
              .aggregate(...)
              .mapValues(...)

[...]
              .toStream()
              .to(outputTopic)

我刚刚意识到,加入后的 mapValues 创建了一个额外的状态存储。

如果 valueMapper 中的计算在某种程度上是微不足道的(例如删除对象中的字段等),那么避免额外的 statestore 不是更好吗?我是否需要转换为 KStream 并使用 KStream.mapValues 来避免 stateStore,即

streamsBuilder.table(inputTopic)
              .join(...)
              .toStream
              .mapValues(valueMapper) // <-- no more additional statestore
              .groupBy(...)
              .aggregate(...)
              .mapValues(...)

[...]
              .toStream()
              .to(outputTopic)

还是有更好的选择来在加入后应用附加映射?

4

1 回答 1

1

为什么要在连接步骤之后使用 mapValues?如果可以在联接的 ValueJoiner 中处理该逻辑。

streamsBuilder.table(inputTopic)
              .join( anotherTable, (a ,b) ->  c )  <--- Here you can perform any mapping process
              .toStream
              .groupBy(...)
              .aggregate(...)
              .mapValues(...)

[...]
              .toStream()
              .to(outputTopic)
于 2021-10-22T16:30:52.727 回答