我正在使用Kafka Utils来测试某个基于 kafka 的消息传递系统。我想在不使用kafka-console-consumer.sh脚本的情况下找出特定主题中的消息数。我似乎找不到基于 KafkaTestUtils 的方式或 java 中的任何方式来帮助我实现这一目标。其他类似问题的答案都没有帮助我。
问问题
455 次
1 回答
1
以下应该有效:
Properties properties = ...
// omitted for the sake of brevity
KafkaConsumer<String,String> consumer = new KafkaConsumer<>(properties);
consumer.subscribe(topic);
consumer.poll(Duration.ofSeconds(10L)); // or some time
AtomicLong count = new AtomicLong();
consumer.endOffsets(consumer.assignment()).forEach( (topicPartition, endOffsetOfPartition) -> {
count.addAndGet(endOffsetOfPartition);
});
// decrement in case of retention as pointed out by Mickael
consumer.beginningOffsets(consumer.assignment()).forEach( (topicPartition, startOffsetOfPartition) -> {
count.set(count.get() - startOffsetOfPartition);
}));
System.out.println(count.get());
您获得每个分区的结束偏移量,并将每个分区的结束偏移量添加到计数中,因为没有。主题中的消息数等于否。该主题的所有分区中的消息。
于 2019-09-04T12:56:08.520 回答