1

我正在尝试使用高级消费者批量读取 Kafka 主题中的消息。在这批读取期间,我的线程必须在某个时候停止。

要么,一旦主题中的所有消息都用完。或在即将读取消息时获取最大偏移量并停止直到达到最大偏移量。

我尝试在高级消费者中使用代码,但 KafkaStream 上的迭代器方法似乎是一个阻塞调用,并等待另一条消息进来。

所以3个问题,

  1. 我怎么知道没有更多消息要从该主题中读取?

  2. 如果我对上述问题有答案,我该如何阻止它再听这个话题?

  3. 有没有办法在批量读取开始时找到最大偏移量(我认为简单的消费者可以做到这一点)并使高级消费者在那个时候停止?

4

1 回答 1

0

您可以选择在指定的时间内没有新消息到达时,您可以选择认为所有消息都已被阅读。这可以使用消费者属性进行设置consumer.timeout.ms。在这个指定的值过去之后没有任何新消息到达,ConsumerIterator 将抛出一个超时异常,您可以在消费者中处理它并退出。

于 2015-04-05T14:06:19.140 回答