1

这是我用来创建新消费者的基本消费者类。它适用于"enable.auto.commit":True消费者。但是,当我创建一个带有enable.auto.commit=False任何 (KeyDeserializationError, ValueDeserializationError) 异常的消费者时,我需要在 except 块中手动提交该消息。由于这个基类也将用于 auto-commit=True,所以这条线 self.consumer.commit() 也被这些类型的消费者调用。

  1. 通过为消费者调用commit()在内部给出任何问题?auto.commit=True(当我在本地尝试时似乎很好)
  2. KeyDeserializationError对 ( , ValueDeserializationError) 异常的理想处理应该是什么auto.commit=False
class KafkaConsumer(object):
    """Wrapper over Kafka Consumer"""
    def __init__(self, topics: list[str], **kwargs: Any):
        config = {
            **kwargs,
        }
        self.consumer = DeserializingConsumer(config)
        self.consumer.subscribe(topics=topics)

    def consume(self, poll_timeout_secs: float = 1.0):
        try:
            while True:
                try:
                    msg = self.consumer.poll(timeout=poll_timeout_secs)
                except (KeyDeserializationError, ValueDeserializationError) as err:
                    self.consumer.commit()
                    continue
                if msg is None:
                    continue
                if msg.error():
                    raise KafkaException(msg.error())
                else:
                    yield msg
        except:
            self.consumer.close()

# create consumer object auto.commit=True/False

kafka_consumer = KafkaConsumer(topics=topics, **kwargs) # i can pass "enable.auto.commit":False for manual commit mode.

# Actual consuming business logic

for message in kafka_consumer.consume():
    try:
        event = message.value()
        logger.info(f"message {event}")
    except Exception as e:
        logger.exception(f'Unable to consume data from kafka {e}')
finally:
pass
# kafka_consumer.consumer.commit(message=message) # in case of manual commit consumer mode
4

0 回答 0