1

我们目前有 2 个 Kafka 流主题,这些主题有连续的记录。我们正在考虑在等待 5 分钟的窗口后基于键加入 2 个流,但使用我当前的代码,我看到记录立即发出,而无需“等待”以查看匹配记录是否到达另一个流。我目前的实现:

KStream<String, String> streamA =
    builder.stream(topicA, Consumed.with(Serdes.String(), Serdes.String()))
        .peek((key, value) -> System.out.println("Stream A incoming record key " + key + " value " + value));

KStream<String, String> streamB =
    builder.stream(topicB, Consumed.with(Serdes.String(), Serdes.String()))
        .peek((key, value) -> System.out.println("Stream B incoming record key " + key + " value " + value));


ValueJoiner<String, String, String > recordJoiner =
    (recordA, recordB) -> {
      if(recordA != null) {
        return recordA;
      } else {
        return recordB;
      }
    };

KStream<String, String > combinedStream =
    streamA(
        streamB,
        recordJoiner,
        JoinWindows
            .of(Duration.ofMinutes(5)),
        StreamJoined.with(
            Serdes.String(),
            Serdes.String(),
            Serdes.String()))
        .peek((key, value) -> System.out.println("Stream-Stream Join record key " + key + " value " + value));

combinedStream.to("test-topic"
    Produced.with(
        Serdes.String(),
        Serdes.String()));

KafkaStreams kafkaStreams = new KafkaStreams(builder.build(), streamsConfiguration);
kafkaStreams.start();

虽然我有JoinWindows.of(Duration.ofMinutes(5)),但我看到一些记录立即发出。我如何确保它们不是?

此外,这是加入 2 个 Kafka 流的最有效方式,还是提出我们自己的从 2 个流读取的消费者实现等更好?

4

0 回答 0