0

我最近发现我一直在使用的一个主题是多分区而不是单分区。我需要重新配置我的消费者类来处理多个分区,但我有点困惑。我目前正在使用一个偏移组,test_offset_group为了下面的示例,我们称之为它。正常情况下,我会一直线性解析,及时继续前行;随着消息被添加到主题中,我将解析它们并继续前进,但如果发生崩溃或需要返回并重新运行前一天的提要,我需要能够按时间戳进行搜索。 Kafka在这个项目中是强制性的,所以我无法更改我正在使用的流数据服务的类型。

我像这样配置我的消费者:

test_consumer = KafkaConsumer("test_topic", bootstrap_servers="bootstrap_string", enable_auto_commit=False, group_id="test_offset_group"

如果我需要寻找时间戳,我将提供一个时间戳,然后使用以下方法寻找:

test_consumer.poll()

tp = TopicPartition("test_topic", 0)

needed_date = datetime.timestamp(timestamp)

rec_in = test_consumer.offsets_for_times({tp: needed_date * 1000})

test_consumer.seek(tp, rec_in[tp].offset)

上述功能非常适合单个分区使用者,但是当您考虑多个分区时,这感觉非常笨拙和困难。我想我可以使用获取分区的数量, test_consumer.partitions_for_topic('test_topic") 然后遍历它们中的每一个,但是再一次,这似乎违背了 Kafka 的原则,我觉得应该有一种更简单的方法来做到这一点。

总结:我想了解如何使用 offset_group 功能寻找具有多个分区的多个偏移量,并且我想确认,通过执行上述操作,我实际上忽略了除 0 之外的所有分区?

4

1 回答 1

1

您正在执行正确的逻辑,您只需要在分配给此使用者实例的所有分区上执行它。

您可以使用 检索当前分配assignment()

于 2020-12-12T20:21:43.257 回答