我正在使用跨多台机器的通用消费者组管理一个 kafka 队列。现在我还需要显示队列的当前内容。我如何仅读取组中尚未读取的那些消息,同时使这些消息再次被实际处理这些消息的组中的其他消费者读取。任何帮助,将不胜感激。
4 回答
在 Kafka 中,从主题中“读取”消息的概念和“消费”它们的概念是相同的。在高层次上,使消费者无法使用“已消费”消息的唯一原因是消费者将其读取偏移量设置为超出相关消息的值。因此,您可以关闭消费者的自动提交功能,并避免在您只想“读取”而不是“消费”的情况下提交偏移量。
获取“所有尚未读取的消息”的一个很好的代理是将最新提交的偏移量与每个分区的高水位标记偏移量进行比较。这提供了一个“滞后”的概念,指示给定消费者在其对分区的消费方面落后多远。pykafka 中的fetch_consumer_lag
CLI 函数是如何做到这一点的一个很好的例子。
在 Kafka 中,一个分区只能由一个组中的一个消费者使用,即如果您的主题有 10 个分区并且您产生了 20 个具有相同 groupId 的消费者,那么只有 10 个将连接到 Kafka,其余 10 个将处于空闲状态。只有在现有消费者之一死亡或未从主题轮询的情况下,Kafka 才会识别新消费者。
AFAIK,我认为您无法在消费者群体中做我理解的您想做的事情。您显然可以根据第一个消费者组收集的信息创建另一个 groupId 并处理消息。
卡夫卡现在有一个KStream.peek()
方法
请参阅提案“添加 KStream peek 方法”。
从文档中我并不是 100% 清楚这可以防止消耗从主题中窥视的消息,但我看不出如何以任何安全、可靠的方式使用它,除非它这样做。
也可以看看:
我认为您可以使用发布订阅模型。然后每个消费者都有自己的偏移量,并且可以为自己消费所有消息。