我正在做 Python Kafka 消费者(尝试在http://kafka-python.readthedocs.org/en/latest/apidoc/kafka.consumer.html中使用 kafka.consumer.SimpleConsumer 或 kafka.consumer.simple.SimpleConsumer )。当我运行以下代码时,它会一直运行,即使所有消息都已消耗。我希望消费者在消费完所有消息后会停止。怎么做?我也不知道如何使用 stop() 函数(它在基类 kafka.consumer.base.Consumer 中)。
更新
我使用信号处理程序来调用 consumer.stop()。一些错误信息被打印到屏幕上。但是程序仍然卡在for循环中。当新消息进来时,消费者消费它们并打印它们。我也试过client.close()。但同样的结果。
我需要一些方法来优雅地停止 for 循环。
client = KafkaClient("localhost:9092")
consumer = SimpleConsumer(client, "test-group", "test")
consumer.seek(0, 2)# (0,2) and (0,0)
for message in consumer:
print "Offset:", message.offset
print "Value:", message.message.value
欢迎任何帮助。谢谢。