我使用 Streams DSL 加入了 2 个主题(实际上更多,但在这里保持简单),一旦加入,就会将数据发布到下游。
我正在主题 1 之上创建一个 KTable 并将其存储到一个命名的状态存储中。Topic1 的键如下所示:
{ sourceCode:"WXYZ",
platformCode:"ABCD",
transactionIdentifier:"012345:01:55555:12345000:1"
}
我按预期看到了变更日志主题中的数据。
在主题 2 之上有一个 KStream。主题 2 的密钥如下所示:
{ sourceCode:"WXYZ",
platformCode:"ABCD",
transactionIdentifier:"012345:01:55555:12345000:1"
lineIdentifier:"1"
}
我正在重新键入和聚合来自主题 2 的数据并将其放入另一个命名状态存储中,因为 topic1 和 topic2 中的数据之间存在 1-Many 关系。重新键入数据后,主题 2 中的键看起来与主题 1 的键相同。我可以看到重新分区主题中重新键入的数据以及变更日志主题中的聚合数据。但是,连接不会被触发。
其他关键细节——</p>
- 所有主题中的数据都是 Avro 格式。
- 我正在使用 Java/Spring Boot。
- 我在commit.interval.ms和cache.max.bytes.buffering上保留了默认设置
任何指向我在这里可能做错的事情?
编辑 1:我查看了数据分区,看起来一个在 14 上,另一个在 20 上。我还发现了一个类似的问题。
编辑 2:topic1 和 topic2 的生产者是一个 golang 应用程序。流恢复消费者具有以下配置:
partition.assignment.strategy = [class org.apache.kafka.clients.consumer.RangeAssignor]
流消费者具有以下配置:
partition.assignment.strategy = [org.apache.kafka.streams.processor.internals.StreamsPartitionAssignor]