2

我是卡夫卡的新手。通过一些 kafka-python 在线教程的帮助,我编写了以下代码:

   from kafka import SimpleProducer, KafkaClient, KafkaConsumer
   kafka =  KafkaClient("localhost:9092")
   producer = SimpleProducer(kafka)
   producer.send_messages(b'my-topic', b'this method', b'Hello World')
   consumer = KafkaConsumer('my-topic',
                     group_id='my_group',
                     bootstrap_servers=['localhost:9092'])
   for message in consumer:
       print("%s:%d:%d: key=%s value=%s" % (message.topic, message.partition,
                                         message.offset, message.key,
                                         message.value))

但问题是,在最后一个 for 循环中,代码执行被卡住了,我无法弄清楚。

4

1 回答 1

2

您的原始代码是正确的。我运行你的代码。KafkaConsumer 可用于消费消息。如果您打开另一个控制台,并运行相同的原始代码,您将看到输出。

http://kafka-python.readthedocs.org/en/latest/apidoc/kafka.consumer.html中有很多 Consumer 类,例如 SimpleConsumer、KafkaConsumer、Consumer 等。为什么你的代码卡在 for 循环中?因为你代码中的消费者默认设置为消费新消息。在这种情况下,producer.send_messages() 函数产生的消息不会被消费者消费。

顺便说一句,如果你使用 SimpleConsumer,你可以使用 seek() 来设置你想消费的消息的偏移量。

于 2015-08-06T15:05:40.953 回答