1

我有一个流,它将消息映射到两个不同的 map() 调用,并进一步被过滤并写入两个不同的主题。

KStream<String, byte[]>[] stream = builder.<String, byte[]>stream("source-topic");

stream.map(logic1OnData).filter(
                (key, value) -> {
                    if (key == null || value == null)
                        return false;
                    return value.data() != null;
                }).to("topic1", Produced.with(Serdes.String(), Serdes.String())

stream.map(logic2OnData).filter(
                (key, value) -> {
                    if (key == null || value == null)
                        return false;
                    return value.data() != null;
                }).to("topic2", Produced.with(Serdes.String(), Serdes.String())

有没有办法可以并行运行 stream.map(logc1OnData)... 和 stream.map(logic2OnData) ?看起来他们一个接一个地运行,即第一个映射被执行并写入topic1,然后第二个映射被执行并写入topic2 FYI ..我不想要num.threads.count,因为我的流输入来自单个主题和我正在运行同一应用程序的多个实例以从源主题主题中读取,以在使用时实现并行性。

我正在寻找的是在执行和写入不同主题时的并行性

4

1 回答 1

1

您正在查看的是您的操作添加到拓扑中的顺序。一旦拓扑被执行,记录器将按照它们到达的顺序流经拓扑,但在运行之前logic2OnData不会等待logic1OnData完成处理。

如果您担心性能,您可以考虑stream threads获得更多并行性。

编辑:看来我可能错过了这个问题。

单个子拓扑不允许您以并行方式运行每个分支。但是,您可以使用repartition()将 logic2OnData 制作成它自己的子拓扑,并且repartition()调用之后的所有内容都将能够与之前的所有内容并行运行。

于 2020-08-25T19:32:21.460 回答