我正在尝试使用高级消费者批量读取 Kafka 主题中的消息。在这批读取期间,我的线程必须在某个时候停止。
要么,一旦主题中的所有消息都用完。或在即将读取消息时获取最大偏移量并停止直到达到最大偏移量。
我尝试在高级消费者中使用代码,但 KafkaStream 上的迭代器方法似乎是一个阻塞调用,并等待另一条消息进来。
所以3个问题,
我怎么知道没有更多消息要从该主题中读取?
如果我对上述问题有答案,我该如何阻止它再听这个话题?
有没有办法在批量读取开始时找到最大偏移量(我认为简单的消费者可以做到这一点)并使高级消费者在那个时候停止?