1

我使用 Streams DSL 加入了 2 个主题(实际上更多,但在这里保持简单),一旦加入,就会将数据发布到下游。 

我正在主题 1 之上创建一个 KTable 并将其存储到一个命名的状态存储中。Topic1 的键如下所示:

{  sourceCode:"WXYZ",
    platformCode:"ABCD",
    transactionIdentifier:"012345:01:55555:12345000:1"
}

我按预期看到了变更日志主题中的数据。  

在主题 2 之上有一个 KStream。主题 2 的密钥如下所示:

{  sourceCode:"WXYZ",
   platformCode:"ABCD",
   transactionIdentifier:"012345:01:55555:12345000:1"
   lineIdentifier:"1"
}

  我正在重新键入和聚合来自主题 2 的数据并将其放入另一个命名状态存储中,因为 topic1 和 topic2 中的数据之间存在 1-Many 关系。重新键入数据后,主题 2 中的键看起来与主题 1 的键相同。我可以看到重新分区主题中重新键入的数据以及变更日志主题中的聚合数据。但是,连接不会被触发。

其他关键细节——</p>

  1. 所有主题中的数据都是 Avro 格式。
  2. 我正在使用 Java/Spring Boot。
  3. 我在commit.interval.mscache.max.bytes.buffering上保留了默认设置

任何指向我在这里可能做错的事情?

编辑 1:我查看了数据分区,看起来一个在 14 上,另一个在 20 上。我还发现了一个类似的问题

编辑 2:topic1 和 topic2 的生产者是一个 golang 应用程序。流恢复消费者具有以下配置:

partition.assignment.strategy = [class org.apache.kafka.clients.consumer.RangeAssignor]

流消费者具有以下配置:

partition.assignment.strategy = [org.apache.kafka.streams.processor.internals.StreamsPartitionAssignor]

4

1 回答 1

0

我在下面发布答案,以帮助其他人从此类问题中寻找涅槃。正如链接问题的评论部分所指出的,这是由于生产者应用程序造成的问题。

Producer 应用程序是用golang编写的,因此,它的散列不同于Java,这是我使用 Streams DSL 连接数据时使用的。

早些时候,这就是我阅读 KTable 的方式,它保持与源主题中相同的分区:

@Bean
public KTable<MyKey, MyValue> myKTable(StreamsBuilder streamsBuilder) {
    return streamsBuilder.table(inputTopic1, Materialized.as(transactionStore));
}

我重写了如下代码,以达到预期的效果:

@Bean
public KTable<MyKey, MyValue> myKTable(StreamsBuilder streamsBuilder) {

    SpecificAvroSerde<MyKey> keySpecificAvroSerde = myKeySpecificAvroSerde();
    SpecificAvroSerde<MyValue> valueSpecificAvroSerde = mySpecificAvroSerde();

    streamsBuilder.stream(inputTopic1, Consumed.with(keySpecificAvroSerde, valueSpecificAvroSerde)).
            selectKey((key, value) -> new MyKey(key.get1(), key.get2(), key.get3())).
        to("dummyTopic", Produced.with(keySpecificAvroSerde, valueSpecificAvroSerde));

    return streamsBuilder.table("dummyTopic",
            Materialized.<MyKey, MyValue, KeyValueStore<Bytes, byte[]>>as("myStateStore").
                   withKeySerde(keySpecificAvroSerde).withValueSerde(valueSpecificAvroSerde));
}
于 2020-06-29T21:33:43.960 回答