我正在使用 Kafka Streams、Spring-Kafka 和 Spring Boot 编写流应用程序。我找不到任何信息如何在使用 Spring-Kafka 时正确测试 Kafka Streams DSL 完成的流处理。文档提到 EmbeddedKafkaBroker,但似乎没有关于如何处理例如状态存储的测试的信息。
只是为了提供一些我想测试的简单示例。我注册了以下 bean(其中 Item 是 avro 生成的):
@Bean
public KTable<String, Long> itemTotalKTable(StreamsBuilder streamsBuilder) {
return streamsBuilder
.stream(ITEM_TOPIC,
Consumed.with(Serdes.String(), itemAvroSerde))
.mapValues((id, item) -> item.getNumber())
.groupByKey()
.aggregate(
() -> 0L,
(id, number, agg) -> agg + number,
Materialized.with(Serdes.String(), Serdes.Long()));
}
测试所有项目编号是否汇总的正确方法是什么?