6

I have a Kafka stream that takes data from a topic, and needs to filter that information to two different topics.

KStream<String, Model> stream = builder.stream(Serdes.String(), specificAvroSerde, "not-filtered-topic");
stream.filter((key, value) -> new Processor().test(key, value)).to(Serdes.String(), specificAvroSerde, "good-topic");
stream.filterNot((key, value) -> new Processor().test(key, value)).to(Serdes.String(), specificAvroSerde, "bad-topic");

However, when I do it like this, it reads the data from the topic twice -- not sure if that has any impact on performance as the data gets larger. Is there a way to just filter it once and push it to two topics?

4

1 回答 1

6

您的方法是正确的,并且没有从主题中读取两次数据,也没有进行内部数据复制。您的方法的唯一缺点是,两个过滤器谓词都针对每条记录进行评估——但是,这很便宜,不应该是性能问题。

但是,您仍然可以通过使用KStream#branch()它来提高性能,它确实需要多个谓词并逐个评估所有谓词并为每个谓词返回一个输入流。如果记录与谓词匹配,则将其放入相应的输出流中并停止评估(即,不会为该单个记录评估进一步的谓词 - 这确保每条记录都被添加到最大一个输出流中;或者在以下情况下被删除没有谓词匹配)。

因此,您可以只提供两个谓词branch():第一个谓词与原始filter()谓词相同,第二个谓词总是返回true

KStream<String, Model> stream = builder.stream(
    Serdes.String(),
    specificAvroSerde,
    "not-filtered-topic"
);
KStream[] splitStreams = stream.branch(
    (key, value) -> new Processor().test(key,value),
    (key, value) -> true
);
splitStreams[0].to(Serdes.String(), specificAvroSerde, "good-topic");
splitStreams[1].to(Serdes.String(), specificAvroSerde, "bad-topic");

不确定此代码是否比您的原始版本更具可读性。我想这是一个品味问题,我个人更喜欢您的原始代码,因为它确实更好地表达了语义。

我添加的版本应该稍微提高 CPU 效率,因为对于所有满足谓词的记录,它只评估一次。并且对于所有不满足结果的记录,true将返回一个简单的(即,没有第二个谓词评估)。

如果您知道大多数记录将以 结尾splitStream[1],您还可以反转谓词(并splitStream[0]用作“坏流”)以减少对第二个true返回谓词的调用次数。但这些只是微优化,应该无关紧要。

于 2016-12-01T22:15:03.887 回答