1

我来寻求奥术知识。

首先,我有两对主题,每对中的一个主题馈入另一个主题。后面的主题正在形成两个KTable,用于KTable+KTable leftJoin。问题是,当我为任一 KTable 生成一条记录时,leftJoin 生成了三条记录。我希望表单中有两条记录(A-null,AB),但我得到的是(A-null,AB,A-null)。我已经确认 KTables 每个都收到一条记录。

我摆弄了 CACHE_MAX_BYTES_BUFFERING_CONFIG 来启用/禁用状态存储缓存。上述行为是将 CACHE_MAX_BYTES_BUFFERING_CONFIG 设置为 0。当我使用 CACHE_MAX_BYTES_BUFFERING_CONFIG 的默认值时,我看到连接输出以下记录:(AB, AB, A-null)

以下是流、消费者、生产者的配置:

properties.put(StreamsConfig.APPLICATION_ID_CONFIG, appName);
properties.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapUrls);
properties.put(StreamsConfig.STATE_DIR_CONFIG, String.format("/tmp/kafka-streams/%s/%s",
properties.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 0); // fiddled with
properties.put(StreamsConfig.CLIENT_ID_CONFIG, appName);
properties.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 1000);
properties.put(StreamsConfig.REPLICATION_FACTOR_CONFIG, 1);
properties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
properties.put(ConsumerConfig.GROUP_ID_CONFIG, appName);
properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, KafkaAvroDeserializer.class
properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, KafkaAvroDeserializer.cla
properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, KafkaAvroSerializer.class);
properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, KafkaAvroSerializer.class);

遇到此行为的处理器 API 代码(已清理)如下,请注意配对的主题 [A1, A2] 和 [B1, B2]:

    KTable<Long, Value> kTableA =
        kstreamBuilder.table(longSerde, valueSerde, topicA2);

    kstreamBuilder.stream(keySerde, envelopeSerde, topicA1)
        .to(longSerde, valueSerde, topicA2);

    kstreamBuilder.stream(keySerde, envelopeSerde, topicB1)
        .to(longSerde, valueSerde, topicB2.topicName);

    KTable<Long, Value> kTableB =
        kstreamBuilder.table(longSerde, valueSerde, topicB2.topicName);

    KTable<Long, Result> joinTable = kTableA.leftJoin(kTableB, (a,b) -> {
        // value joiner called three times with only a single record input
        // into topicA1 and topicB1
    });

    joinTable.groupBy(...)
    .aggregate(...)
    .to(longSerde, aggregateSerde, outputTopic);

预先感谢您的任何帮助,哦,仁慈的人。

更新: 我正在使用一个 kafka 服务器和每个主题 1 个分区运行,并且遇到了这种行为。当我将服务器数量增加到 2 并将分区数量增加到 3 时,我的输出变为(A-null)。

在我看来,我需要花更多时间阅读 kafka 手册......

4

0 回答 0