我对卡夫卡很陌生。我正在创建两个主题并从两个生产者那里发布这两个主题。我有一个消费者消费来自两个主题的消息。这是因为我想按照优先级进行处理。
我从两个主题中都得到了一个流,但是一旦我开始迭代ConsumerItreator
任何流,它就会在那里阻塞。正如它在文档中所写的那样,它将被阻止,直到它收到一条新消息。
有人知道如何从单个 Kafka 消费者中读取两个主题和两个流吗?
Map<String, Integer> topicCountMap = new HashMap<String, Integer>();
topicCountMap.put(KafkaConstants.HIGH_TEST_TOPIC, new Integer(1));
topicCountMap.put(KafkaConstants.LOW_TEST_TOPIC, new Integer(1));
Map<String, List<KafkaStream<byte[], byte[]>>> consumerMap = consumerConnector.createMessageStreams(topicCountMap);
KafkaStream<byte[], byte[]> highPriorityStream = consumerMap.get(KafkaConstants.HIGH_TEST_TOPIC).get(0);
ConsumerIterator<byte[], byte[]> highPrioerityIterator = highPriorityStream.iterator();
while (highPriorityStream.nonEmpty() && highPrioerityIterator.hasNext())
{
byte[] bytes = highPrioerityIterator.next().message();
Object obj = null;
CLoudDataObject thunderDataObject = null;
try
{
obj = SerializationUtils.deserialize(bytes);
if (obj instanceof CLoudDataObject)
{
thunderDataObject = (CLoudDataObject) obj;
System.out.println(thunderDataObject);
// TODO Got the Thunder object here, now write code to send it to Thunder service.
}
}
catch (Exception e)
{
}
}