2

我正在尝试用 Java 实现一个简单的 Producer-->Kafka-->Consumer 应用程序。我能够成功地生产和消费消息,但是当我重新启动消费者时会出现问题,其中一些已经消费的消息再次被消费者从 Kafka 中获取(不是所有消息,而是最后的一些消息)消费的消息)。

我已autooffset.reset=largest在我的消费者中设置,我的autocommit.interval.ms属性设置为 1000 毫秒。

这是“重新传递一些已使用的消息”是一个已知问题,还是我在这里缺少任何其他设置?

基本上,有没有办法确保以前消费的消息都不会被消费者拾取/消费?

4

1 回答 1

3

Kafka 使用 Zookeeper 来存储消费者偏移量。由于 Zookeeper 的操作非常缓慢,因此不建议在消费完每条消息后提交偏移量。

可以向消费者添加关闭钩子,以在退出前手动提交主题偏移量。但是,这在某些情况下(如 jvm 崩溃或kill -9)无济于事。为了防止这种情况,我建议实现自定义提交逻辑,在处理每条消息(文件或本地数据库)后在本地提交偏移量,并且每 1000 毫秒向 Zookeeper 提交偏移量。在消费者启动时,这两个位置都应该被查询,并且最多应该使用两个值作为消费偏移量。

于 2013-05-01T08:33:00.443 回答