有一个嵌入式 kafka 实例作为测试的一部分运行。我正在尝试验证是否已阅读所有消息,但从 kafka 管理客户端得到一个空结果。
Map<TopicPartition, OffsetAndMetadata> partitionOffset = embeddedKafkaRule.getEmbeddedKafka().doWithAdminFunction(admin -> {
try{
return admin.listConsumerGroupOffsets(COUNTER_GROUP).partitionsToOffsetAndMetadata().get();
}catch (Exception e){
throw new RuntimeException(e);
}
});
地图总是空的。我已经尝试设置 ack all 并设置 100ms autoOffsetCommit 等待看看这是否有任何区别,但没有运气。
System.setProperty("spring.cloud.stream.kafka.streams.binder.brokers", embeddedKafkaRule.getEmbeddedKafka()
.getBrokersAsString());
System.setProperty("spring.cloud.stream.bindings.enricher-in-0.destination", COUNTER_TOPIC);
System.setProperty("spring.cloud.stream.bindings.enricher-in-0.group", COUNTER_GROUP);
System.setProperty("spring.cloud.stream.bindings.enricher-out-0.destination", ENRICHED_COUNTER_TOPIC);
System.setProperty("spring.cloud.stream.kafka.bindings.enricher-in-0.consumer.ackEachRecord", "true");
System.setProperty("spring.cloud.stream.kafka.bindings.enricher-in-0.autoCommitOffset", "true"); System.setProperty("spring.cloud.stream.kafka.streams.binder.configuration.commit.interval.ms", "100");