我从 Kafka 代理读取的 python 融合 kafka 代码如下所示
self.consumer = Consumer(
{
"auto.offset.reset": "earliest",
"enable.auto.commit": False,
}
)
while True:
msg = self.consumer.poll(timeout=5)
log.info(f"Before commit {msg.topic()} {msg.partition()}
{msg.offset()}")
#Before commit stream-seg 2 6476
self.consumer.commit(asynchronous=False)
log.info(f"After commit {msg.topic()} {msg.partition()}
{msg.offset()}")
#After commit stream-seg 2 6476
如上所见,提交前后的 msg.offset() 是相同的。我应该在执行 consumer.commit() 时也提交偏移值/分区还是我错过了什么