我最近发现我一直在使用的一个主题是多分区而不是单分区。我需要重新配置我的消费者类来处理多个分区,但我有点困惑。我目前正在使用一个偏移组,test_offset_group
为了下面的示例,我们称之为它。正常情况下,我会一直线性解析,及时继续前行;随着消息被添加到主题中,我将解析它们并继续前进,但如果发生崩溃或需要返回并重新运行前一天的提要,我需要能够按时间戳进行搜索。 Kafka
在这个项目中是强制性的,所以我无法更改我正在使用的流数据服务的类型。
我像这样配置我的消费者:
test_consumer = KafkaConsumer("test_topic", bootstrap_servers="bootstrap_string", enable_auto_commit=False, group_id="test_offset_group"
如果我需要寻找时间戳,我将提供一个时间戳,然后使用以下方法寻找:
test_consumer.poll()
tp = TopicPartition("test_topic", 0)
needed_date = datetime.timestamp(timestamp)
rec_in = test_consumer.offsets_for_times({tp: needed_date * 1000})
test_consumer.seek(tp, rec_in[tp].offset)
上述功能非常适合单个分区使用者,但是当您考虑多个分区时,这感觉非常笨拙和困难。我想我可以使用获取分区的数量,
test_consumer.partitions_for_topic('test_topic")
然后遍历它们中的每一个,但是再一次,这似乎违背了 Kafka 的原则,我觉得应该有一种更简单的方法来做到这一点。
总结:我想了解如何使用 offset_group 功能寻找具有多个分区的多个偏移量,并且我想确认,通过执行上述操作,我实际上忽略了除 0 之外的所有分区?