我有一个处理器,它与 StateStore 交互以过滤并对消息执行复杂的逻辑。在process(key,value)
我context.forward(key,value)
用来发送我需要的键和值的方法中。出于调试目的,我还打印了这些。
我有一个 KStream mergedStream
,它是由其他两个流的连接产生的。我想将处理器应用于该流的记录。我通过以下方式实现了这一目标:mergedStream.process(myprocessor,"stateStoreName")
当我启动这个程序时,我可以看到要打印到控制台的正确值。但是,如果我使用主题上的值将 mergeStream 发送到mergedStream.to("topic")
主题,则不是我在处理器中转发的值,而是原始值。
我使用 kafka-streams 0.10.1.0。
将我在处理器中转发到另一个流的值的最佳方法是什么?
是否可以将处理器 API与KStream DSL创建的流混合?