1

我正在使用 pykafka 不断向主题生成消息

producer.produce('test')

我想收到最新的消息。我在 pykafka Github 页面上找到了一个解决方案,它建议:

client = KafkaClient(hosts="xxxxxxx")
topic = client.topics['mytopic']
consumer = topic.get_simple_consumer(
    auto_offset_reset=OffsetType.LATEST,
    reset_offset_on_start=True)
LAST_N_MESSAGES = 2
offsets = [(p, op.next_offset - LAST_N_MESSAGES) for p, op in consumer._partitions.iteritems()]
consumer.reset_offsets(offsets)
consumer.consume()

但是,我真的不明白这里发生了什么,如果那里至少有两条消息,它只会获取最新消息。

有没有更强大的解决方案?

4

1 回答 1

4

准确定义“最新消息”的含义很重要。在具有多个分区的 Kafka 主题中,如果不检查消息内容,实际上不可能知道每个分区上的哪些最新消息是全局最新消息。定义何时获取最新消息也很重要——您现在想要它们一次吗?您想从最近的消息开始消费,然后在消息被添加到主题时继续消费吗?您想定期只获取最新的 N 条消息吗?

您在上面包含的配方(我为 PyKafka 文档编写的基础)为您提供了每个分区的最后 N 条消息,供您选择 N。如果您只想获取最后一条消息,您可以简单地设置LAST_N_MESSAGES为 1。基本上recipe 检查每个分区消耗的最新偏移量,然后将消费者的偏移量重置为恰好LAST_N_MESSAGES在此之前。当您从这一点开始消费时,您只会获得分区的最后 N 条消息。

综上所述,如果您只是对从主题结束开始消费感兴趣,您可以使用以下命令:

consumer = topic.get_simple_consumer(
    auto_offset_reset=OffsetType.LATEST,
    reset_offset_on_start=True)

并开始正常消费。

于 2017-09-14T18:46:03.603 回答