9

我有一个处理器,它与 StateStore 交互以过滤并对消息执行复杂的逻辑。在process(key,value)context.forward(key,value)用来发送我需要的键和值的方法中。出于调试目的,我还打印了这些。

我有一个 KStream mergedStream,它是由其他两个流的连接产生的。我想将处理器应用于该流的记录。我通过以下方式实现了这一目标:mergedStream.process(myprocessor,"stateStoreName")

当我启动这个程序时,我可以看到要打印到控制台的正确值。但是,如果我使用主题上的值将 mergeStream 发送到mergedStream.to("topic")主题,则不是我在处理器中转发的值,而是原始值。

我使用 kafka-streams 0.10.1.0。

将我在处理器中转发到另一个流的值的最佳方法是什么?

是否可以将处理器 APIKStream DSL创建的流混合?

4

1 回答 1

14

短的:

为了解决您的问题,您也可以使用它transform(...)来代替process(...)DSL 中的处理器 API 访问权限。

如果您使用process(...)您将处理器应用于流 - 但是,这是一个“终止”(或接收器)操作(它的返回类型是void),即它不返回任何结果(这里“接收器”仅表示运算符没有后继——这并不意味着任何结果都写在某处!)

此外,如果您调用mergedStream.process(...)并且mergedStream.to(...)您基本上分支和复制您的流并将一份副本发送给每个下游运营商(即,一份process副本到to.

混合 DSL 和处理器 API 是绝对可能的(你已经这样做了;))。但是,使用process(...)你不能forward(...)在 DSL 中使用你的数据——如果你想使用处理器 API 结果,你可以使用transform(...)而不是process(...).

于 2016-11-28T05:58:04.303 回答