0

我从 Kafka 代理读取的 python 融合 kafka 代码如下所示

self.consumer = Consumer(
            {
                "auto.offset.reset": "earliest",
                "enable.auto.commit": False,
            }
        )

while True:
        msg = self.consumer.poll(timeout=5)
        log.info(f"Before commit  {msg.topic()} {msg.partition()} 
        {msg.offset()}")
        #Before commit  stream-seg 2 6476

        self.consumer.commit(asynchronous=False)

        log.info(f"After commit  {msg.topic()} {msg.partition()} 
        {msg.offset()}")
        #After commit  stream-seg 2 6476

如上所见,提交前后的 msg.offset() 是相同的。我应该在执行 consumer.commit() 时也提交偏移值/分区还是我错过了什么

4

1 回答 1

1

您永远不会更改msg变量,因此不会对其属性进行任何修改作为提交的副作用。

于 2021-09-20T20:09:15.987 回答