这是我用来创建新消费者的基本消费者类。它适用于"enable.auto.commit":True
消费者。但是,当我创建一个带有enable.auto.commit=False
任何 (KeyDeserializationError, ValueDeserializationError) 异常的消费者时,我需要在 except 块中手动提交该消息。由于这个基类也将用于 auto-commit=True,所以这条线 self.consumer.commit() 也被这些类型的消费者调用。
- 通过为消费者调用commit()在内部给出任何问题?
auto.commit=True
(当我在本地尝试时似乎很好) KeyDeserializationError
对 ( ,ValueDeserializationError
) 异常的理想处理应该是什么auto.commit=False
?
class KafkaConsumer(object):
"""Wrapper over Kafka Consumer"""
def __init__(self, topics: list[str], **kwargs: Any):
config = {
**kwargs,
}
self.consumer = DeserializingConsumer(config)
self.consumer.subscribe(topics=topics)
def consume(self, poll_timeout_secs: float = 1.0):
try:
while True:
try:
msg = self.consumer.poll(timeout=poll_timeout_secs)
except (KeyDeserializationError, ValueDeserializationError) as err:
self.consumer.commit()
continue
if msg is None:
continue
if msg.error():
raise KafkaException(msg.error())
else:
yield msg
except:
self.consumer.close()
# create consumer object auto.commit=True/False
kafka_consumer = KafkaConsumer(topics=topics, **kwargs) # i can pass "enable.auto.commit":False for manual commit mode.
# Actual consuming business logic
for message in kafka_consumer.consume():
try:
event = message.value()
logger.info(f"message {event}")
except Exception as e:
logger.exception(f'Unable to consume data from kafka {e}')
finally:
pass
# kafka_consumer.consumer.commit(message=message) # in case of manual commit consumer mode