我正在使用 pykafka 不断向主题生成消息
producer.produce('test')
我想收到最新的消息。我在 pykafka Github 页面上找到了一个解决方案,它建议:
client = KafkaClient(hosts="xxxxxxx")
topic = client.topics['mytopic']
consumer = topic.get_simple_consumer(
auto_offset_reset=OffsetType.LATEST,
reset_offset_on_start=True)
LAST_N_MESSAGES = 2
offsets = [(p, op.next_offset - LAST_N_MESSAGES) for p, op in consumer._partitions.iteritems()]
consumer.reset_offsets(offsets)
consumer.consume()
但是,我真的不明白这里发生了什么,如果那里至少有两条消息,它只会获取最新消息。
有没有更强大的解决方案?