该commitSync
方法将无限期地自动重试,所以如果你得到CommitFailedException
那么它不是一个可重试的条件,再次调用 commit 不太可能有帮助。您收到此异常是因为您的消费者已被踢出消费者组。
如果您commitAsync
用于提交偏移量,则重试不是自动的,您可能会收到一个RetriableCommitFailedException
指示潜在的暂时性错误,您可以再次手动重试提交。听起来这不是您的情况,但我将其包括在内是为了完整回答。
一旦你的消费者被踢出组并且你得到这个CommitFailedException
异常,你可以继续调用 poll() 直到重新平衡完成并且你被允许回到消费者组(可能比以前使用一组新的分区),它会继续。
如果您的应用程序不能容忍您收到的分区(以及密钥)在中途更改的情况,那么您应该实现一个重新平衡侦听器,该侦听器将在分区分配更改时被调用。见http://kafka.apache.org/0110/javadoc/index.html?org/apache/kafka/clients/consumer/ConsumerRebalanceListener.html
如果您只是想解决偏移量每 24 小时到期的事实,那么除了定期调用 poll() 以保持在消费者组中之外,您还需要每天至少调用一次 commit 以使偏移量保持最新