3

我有一个问题,我的提交失败,因为 poll() 太长(为什么会发生这种情况,我不知道,没有消息,它只是在一个空队列上读取/提交,我的 poll-interval 设置为小时)。然后当它再次点击 read() 时,由于某种原因它不会重新平衡。然而,这只发生在我的代码在 bluemix 上运行时,当我在本地重现异常时,下一个 read() 会导致重新平衡。

从 CommitFailedException 中恢复的正确方法是什么?我应该 close() 并重新创建我的消费者吗?还是调用 read() 应该重新平衡并让我继续?

4

2 回答 2

1

@kyl,所以我相信使用默认的 kafka-java 客户端,消费者将每 3 秒心跳一次,会话超时为 10 秒,因此您的消费者应该留在组内而不会被取出并发生重新平衡。你的信息中包含什么信息CommitFailedException?我假设提交失败是因为您已被踢出局。

关于此的其他一些问题:

  1. 您是否有多个消费者来来去去,和/或您是否有意使用消费者群体而不仅仅是单个消费者?

  2. “我的轮询间隔设置为小时”是什么意思?

  3. “提交一个空队列”是什么意思?

你能分享一段你的消费者循环代码吗,因为这可能有助于更好地解释你在做什么

于 2017-07-28T10:39:11.597 回答
1

commitSync方法将无限期地自动重试,所以如果你得到CommitFailedException那么它不是一个可重试的条件,再次调用 commit 不太可能有帮助。您收到此异常是因为您的消费者已被踢出消费者组。

如果您commitAsync用于提交偏移量,则重试不是自动的,您可能会收到一个RetriableCommitFailedException指示潜在的暂时性错误,您可以再次手动重试提交。听起来这不是您的情况,但我将其包括在内是为了完整回答。

一旦你的消费者被踢出组并且你得到这个CommitFailedException异常,你可以继续调用 poll() 直到重新平衡完成并且你被允许回到消费者组(可能比以前使用一组新的分区),它会继续。

如果您的应用程序不能容忍您收到的分区(以及密钥)在中途更改的情况,那么您应该实现一个重新平衡侦听器,该侦听器将在分区分配更改时被调用。见http://kafka.apache.org/0110/javadoc/index.html?org/apache/kafka/clients/consumer/ConsumerRebalanceListener.html

如果您只是想解决偏移量每 24 小时到期的事实,那么除了定期调用 poll() 以保持在消费者组中之外,您还需要每天至少调用一次 commit 以使偏移量保持最新

于 2017-08-04T17:18:17.807 回答