我正在使用以下脚本从 Kafka 主题中读取消息。我能够阅读消息。但是,偏移量不会提交。关于如何强制偏移提交的任何建议,或者目前可能阻止提交的任何建议?(注意:这基本上取自文档,您可以看到它与他们的平衡消费者示例非常相似。)
我正在使用 Python 2.7.8、pykafka 2.1.1 和 kafka 0.8.2.1。
谢谢你。
from pykafka import KafkaClient
client = KafkaClient(hosts='myHost:9092')
topic = client.topics['myTopic']
balanced_consumer = topic.get_balanced_consumer(
reset_offset_on_start=False,
consumer_group='myTestConsumerGroup',
auto_commit_enable=True,
auto_commit_interval_ms=1000,
zookeeper_connect='myZookeeperHost')
for message in balanced_consumer:
if message is not None:
print(str(message.offset) + '\t' + message.value)
balanced_consumer.commit_offset()