我们目前有 2 个 Kafka 流主题,这些主题有连续的记录。我们正在考虑在等待 5 分钟的窗口后基于键加入 2 个流,但使用我当前的代码,我看到记录立即发出,而无需“等待”以查看匹配记录是否到达另一个流。我目前的实现:
KStream<String, String> streamA =
builder.stream(topicA, Consumed.with(Serdes.String(), Serdes.String()))
.peek((key, value) -> System.out.println("Stream A incoming record key " + key + " value " + value));
KStream<String, String> streamB =
builder.stream(topicB, Consumed.with(Serdes.String(), Serdes.String()))
.peek((key, value) -> System.out.println("Stream B incoming record key " + key + " value " + value));
ValueJoiner<String, String, String > recordJoiner =
(recordA, recordB) -> {
if(recordA != null) {
return recordA;
} else {
return recordB;
}
};
KStream<String, String > combinedStream =
streamA(
streamB,
recordJoiner,
JoinWindows
.of(Duration.ofMinutes(5)),
StreamJoined.with(
Serdes.String(),
Serdes.String(),
Serdes.String()))
.peek((key, value) -> System.out.println("Stream-Stream Join record key " + key + " value " + value));
combinedStream.to("test-topic"
Produced.with(
Serdes.String(),
Serdes.String()));
KafkaStreams kafkaStreams = new KafkaStreams(builder.build(), streamsConfiguration);
kafkaStreams.start();
虽然我有JoinWindows.of(Duration.ofMinutes(5))
,但我看到一些记录立即发出。我如何确保它们不是?
此外,这是加入 2 个 Kafka 流的最有效方式,还是提出我们自己的从 2 个流读取的消费者实现等更好?