10

我正在使用以下代码从主题中读取消息。我面临两个问题。每当我启动消费者时,它正在读取队列中的所有消息?如何只阅读未读消息?

from kafka import KafkaConsumer


consumer = KafkaConsumer('my-topic',
                         group_id='my-group',
                         bootstrap_servers=['localhost:9092'])
for message in consumer:
    consumer.commit() 
    # message value and key are raw bytes -- decode if necessary!
    # e.g., for unicode: `message.value.decode('utf-8')`
    print ("%s:%d:%d: key=%s value=%s" % (message.topic, message.partition,
                                          message.offset, message.key,
                                          message.value))
4

1 回答 1

11

正如@Kenji 所说,您必须使用consumer.commit(). 如果您不想手动提交,可以通过传递enable_auto_commit=True给您的KafkaConsumer. 您可能还想调整auto_commit_interval_ms每次自动提交之间的时间间隔(以毫秒为单位)。见这里: http: //kafka-python.readthedocs.org/en/master/apidoc/KafkaConsumer.html

于 2016-01-09T12:31:19.967 回答