我发现这段代码成功读取了 kafka 主题并在屏幕上打印了每条消息。我想扩展它以对字符串执行其他操作,而不仅仅是在屏幕上打印。为此,我想了解迭代消息的 while 循环中发生了什么。it.hasNext() 有什么作用?它是否查找下一条消息或新消息列表。它什么时候会退出这个while循环?
import kafka.consumer.ConsumerIterator;
import kafka.consumer.KafkaStream;
public class ConsumerTest implements Runnable {
private KafkaStream m_stream;
private int m_threadNumber;
public ConsumerTest(KafkaStream a_stream, int a_threadNumber) {
m_threadNumber = a_threadNumber;
m_stream = a_stream;
}
public void run() {
**ConsumerIterator<byte[], byte[]> it = m_stream.iterator();
while (it.hasNext())**
System.out.println("Thread " + m_threadNumber + ": " + new String(it.next().message()));
System.out.println("Shutting down Thread: " + m_threadNumber);
}
}