我正在使用来自 Kafka 站点的 ConsumerGroupExample 代码测试 Kafka 高级消费者。我想检索 Kafka 服务器配置中名为“test”的主题的所有现有消息。查看其他博客,auto.offset.reset 应该设置为“最小”才能获取所有消息:
private static ConsumerConfig createConsumerConfig(String a_zookeeper, String a_groupId) {
Properties props = new Properties();
props.put("zookeeper.connect", a_zookeeper);
props.put("group.id", a_groupId);
props.put("auto.offset.reset", "smallest");
props.put("zookeeper.session.timeout.ms", "10000");
return new ConsumerConfig(props);
}
我真正遇到的问题是:高级消费者的等效 Java api 调用是什么,相当于:
bin/kafka-console-consumer.sh --zookeeper localhost:2181 --topic test --from-beginning