7

我正在使用来自 Kafka 站点的 ConsumerGroupExample 代码测试 Kafka 高级消费者。我想检索 Kafka 服务器配置中名为“test”的主题的所有现有消息。查看其他博客,auto.offset.reset 应该设置为“最小”才能获取所有消息:

private static ConsumerConfig createConsumerConfig(String a_zookeeper, String a_groupId)    {
    Properties props = new Properties();
    props.put("zookeeper.connect", a_zookeeper);
    props.put("group.id", a_groupId);
    props.put("auto.offset.reset", "smallest");
    props.put("zookeeper.session.timeout.ms", "10000");     

    return new ConsumerConfig(props);
}

我真正遇到的问题是:高级消费者的等效 Java api 调用是什么,相当于:

bin/kafka-console-consumer.sh --zookeeper localhost:2181 --topic test --from-beginning

4

4 回答 4

7

基本上,每当一个新的消费者尝试消费一个主题时,它都会从头开始读取消息。如果您每次都特别从头开始消费以进行测试,那么每次您使用新的 groupID 初始化消费者时,它都会从头开始读取消息。我是这样做的:

properties.put("group.id", UUID.randomUUID().toString());

每次都从头开始阅读消息!

于 2015-11-19T07:28:43.747 回答
5

看起来您需要使用“低级 SimpleConsumer API”

对于大多数应用程序来说,高级消费者 API 已经足够好了。一些应用程序希望功能尚未暴露给高级消费者(例如,在重新启动消费者时设置初始偏移量)。他们可以改为使用我们的低级 SimpleConsumer Api。逻辑会稍微复杂一些,您可以按照此处的示例进行操作。

此示例用于从具有以下参数的主题中获取所有消息:(请注意,该端口是 Kafka 端口,而不是 ZooKeeper 端口,此示例中设置的主题):

10 my-replicated-topic 0 localhost 9092

具体来说,有一种方法可以获取 readOffset,它需要 kafka.api.OffsetRequest.EarliestTime():

long readOffset = getLastOffset(consumer,a_topic, a_partition, kafka.api.OffsetRequest.EarliestTime(), clientName);

这是另一篇文章可能会提供一些关于如何解决这个问题的替代想法:如何从 Kafka 中的旧偏移点获取数据?

于 2014-02-18T00:44:20.120 回答
2

要从头开始获取消息,您可以执行以下操作:

import kafka.utils.ZkUtils;
ZkUtils.maybeDeletePath("zkhost:zkport", "/consumers/group.id");

然后就按照日常工作...

于 2015-12-09T04:14:21.723 回答
0
 Properties props = new Properties(); 
 props.put("bootstrap.servers", "localhost:9092");
 props.put("auto.offset.reset", "earliest");
 props.put("group.id", UUID.randomUUID().toString());

此属性将为您提供帮助。

于 2017-04-25T08:29:38.337 回答