8

我正在做 Python Kafka 消费者(尝试在http://kafka-python.readthedocs.org/en/latest/apidoc/kafka.consumer.html中使用 kafka.consumer.SimpleConsumer 或 kafka.consumer.simple.SimpleConsumer )。当我运行以下代码时,它会一直运行,即使所有消息都已消耗。我希望消费者在消费完所有消息后会停止。怎么做?我也不知道如何使用 stop() 函数(它在基类 kafka.consumer.base.Consumer 中)。

更新

我使用信号处理程序来调用 consumer.stop()。一些错误信息被打印到屏幕上。但是程序仍然卡在for循环中。当新消息进来时,消费者消费它们并打印它们。我也试过client.close()。但同样的结果。

我需要一些方法来优雅地停止 for 循环。

        client = KafkaClient("localhost:9092")
        consumer = SimpleConsumer(client, "test-group", "test")

        consumer.seek(0, 2)# (0,2) and (0,0)

        for message in consumer:
            print "Offset:", message.offset
            print "Value:", message.message.value

欢迎任何帮助。谢谢。

4

4 回答 4

8

我们可以先查看主题中最后一条消息的偏移量。然后当我们达到那个偏移量时停止循环。

    client = "localhost:9092"
    consumer = KafkaConsumer(client)
    topic = 'test'
    tp = TopicPartition(topic,0)
    #register to the topic
    consumer.assign([tp])

    # obtain the last offset value
    consumer.seek_to_end(tp)
    lastOffset = consumer.position(tp)

    consumer.seek_to_beginning(tp)        

    for message in consumer:
        print "Offset:", message.offset
        print "Value:", message.message.value
        if message.offset == lastOffset - 1:
            break
于 2017-08-01T06:04:45.943 回答
2

使用 iter_timeout 参数设置等待时间。如果设置为 10,就像下面这段代码一样,如果 10 秒内没有新消息进来,它将退出。默认值为None,这意味着即使没有新消息进来,消费者也会在这里阻塞。

        self.consumer = SimpleConsumer(self.client, "test-group", "test",
                iter_timeout=10)

更新

上面的方法不是很好。当大量消息进来时,很难设置足够小的 iter_timeout 来保证停止。所以,现在,我正在使用 get_message() 函数,它尝试使用一条消息并停止。没有新消息时返回 None。

于 2015-08-06T15:52:22.980 回答
1

与 Mohit 的答案类似的解决方案,但使用end_offsets了消费者的功能。

from kafka import KafkaConsumer, TopicPartition

# settings
client = "localhost:9092"
topic = 'test'

# prepare consumer
tp = TopicPartition(topic,0)
consumer = KafkaConsumer(client)
consumer.assign([tp])
consumer.seek_to_beginning(tp)  

# obtain the last offset value
lastOffset = consumer.end_offsets([tp])[tp]

for message in consumer:
    print "Offset:", message.offset
    print "Value:", message.message.value
    if message.offset == lastOffset - 1:
        break
于 2019-01-10T10:27:44.690 回答
0

更简单的解决方案:

poll()改为使用poll_timeout_ms. poll()是非阻塞调用。

  • 在您的 while 循环之外创建一个计数器变量。
  • 每次 poll() 从 Kafka Brokers 获取 0 条记录时,增加计数器。
  • poll()如果提取记录,则将计数器重置为 0
  • 如果计数器 == 某个阈值(例如 10),则跳出循环并关闭消费者。

在这个逻辑中,我们依赖于这样一个事实,即如果poll()在随后的 10 次调用中没有获取任何记录,这意味着我们已经读取了所有数据。

于 2021-10-21T15:35:35.787 回答