3

我发现这段代码成功读取了 kafka 主题并在屏幕上打印了每条消息。我想扩展它以对字符串执行其他操作,而不仅仅是在屏幕上打印。为此,我想了解迭代消息的 while 循环中发生了什么。it.hasNext() 有什么作用?它是否查找下一条消息或新消息列表。它什么时候会退出这个while循环?

import kafka.consumer.ConsumerIterator;
import kafka.consumer.KafkaStream;

public class ConsumerTest implements Runnable {
    private KafkaStream m_stream;
    private int m_threadNumber;

    public ConsumerTest(KafkaStream a_stream, int a_threadNumber) {
        m_threadNumber = a_threadNumber;
        m_stream = a_stream;
    }

    public void run() {
        **ConsumerIterator<byte[], byte[]> it = m_stream.iterator();
          while (it.hasNext())**
            System.out.println("Thread " + m_threadNumber + ": " + new String(it.next().message()));
           System.out.println("Shutting down Thread: " + m_threadNumber);
    }
}
4

2 回答 2

2

Kafka 迭代器 .hasNext() 方法通常总是返回 true !因此,您将永远不会脱离标准操作的循环。但是,如果发生异常,它可能会返回 false 并停止获取。

所以,一般来说,你会进入循环,并且对 it.next() 的调用实际上是阻塞的,直到有消息要消费,在这种情况下它将被返回。

于 2015-05-29T07:42:36.860 回答
1

尽管这是一个老问题,而且 kafka 0.8 可能很少在生产中使用,并且 kafka 0.10 提供了更丰富的 API,但我想补充一点,ConsumerIterator 的方法hasNext()是等待消息的阻塞操作。

因此,例如,如果您想使用消费者读取之前发送的所有数据并在完成时返回,您可以将属性设置"consumer.timeout.ms"为例如100导致在阻塞时间超过超时时hasNext()抛出一个。ConsumerTimeoutException

在您的代码中,只需在 while 循环中捕获异常:

try {
    while (it.hasNext()) {
        String message = new String(it.next().message()));
        // do sth with message
    }
} catch {
    case t: ConsumerTimeoutException => // empty
} finally {
    // close your consumer
}
于 2017-08-02T14:30:08.803 回答