我正在使用 kafka 高级消费者。当我启动消费者时,它会找到所有新消息。当我使用 Java kafka 生产者生成新消息时,它会发现它们。但是一分钟后,它继续循环,但没有找到新消息。当我在调试器中暂停执行时,消费者突然开始寻找要消费的消息。我在 Java 中使用 0.8.0 版。请注意,在发生错误时,使用消息的进程将在单独的“错误”主题中生成消息。当我停止产生这些错误消息时,我就不再遇到这个问题了。
问问题
315 次
1 回答
0
这个问题似乎是我没有看到报告的 kafka 错误。如果您使用同一个 ConsumerConnector 创建多个 ConsumerIterator(通过为其提供多个主题的映射),则主题会每隔一段时间在 ConsumerIterators 中切换。如果您尝试通过在调试器中暂停来查看 consumerIterator,它们会切换回来。
这是我创建具有错误的 ConsumerIterators 的旧代码:
/**
* @param zookeeperAddresses (includes the port number)
* @param topics all topics to be consumed.
* @return A list of ConsumerIterators.
*/
public List<ConsumerIterator> getConsumers(String zookeeperAddresses, List<String> topics) {
String groupId = "client_" + topics.get(0);
LOGGER.info("Zookeeper address = " + zookeeperAddresses + ", group id = " + groupId);
ConsumerConnector consumer = kafka.consumer.Consumer.createJavaConsumerConnector(
createConsumerConfig(zookeeperAddresses, groupId));
consumers.add(consumer);
Map<String, Integer> topicCountMap = new HashMap<>();
for (String topic : topics) {
topicCountMap.put(topic, Integer.valueOf(1));
}
Map<String, List<KafkaStream<byte[], byte[]>>> consumerMap = consumer.createMessageStreams(topicCountMap);
List<ConsumerIterator> topicConsumers = new LinkedList<>();
for (String topic : topics) {
List<KafkaStream<byte[], byte[]>> streams = consumerMap.get(topic);
assert(streams.size() == 1);
ConsumerIterator<byte[], byte[]> consumerIterator = streams.get(0).iterator();
topicConsumers.add(consumerIterator);
}
return topicConsumers;
}
这是解决此错误的固定代码:
/**
* @param zookeeperAddresses (includes the port number)
* @param topics all topics to be consumed.
* @return A list of ConsumerIterators.
*/
public List<ConsumerIterator> getConsumers(String zookeeperAddresses, List<String> topics) {
String groupId = "client_" + topics.get(0);
LOGGER.info("Zookeeper address = " + zookeeperAddresses + ", group id = " + groupId);
List<ConsumerIterator> topicConsumers = new LinkedList<>();
for (String topic : topics) {
ConsumerConnector consumer = kafka.consumer.Consumer.createJavaConsumerConnector(
createConsumerConfig(zookeeperAddresses, groupId));
consumers.add(consumer);
Map<String, Integer> topicCountMap = new HashMap<>();
topicCountMap.put(topic, Integer.valueOf(1));
Map<String, List<KafkaStream<byte[], byte[]>>> consumerMap = consumer.createMessageStreams(topicCountMap);
List<KafkaStream<byte[], byte[]>> streams = consumerMap.get(topic);
assert(streams.size() == 1);
ConsumerIterator<byte[], byte[]> consumerIterator = streams.get(0).iterator();
topicConsumers.add(consumerIterator);
}
return topicConsumers;
}
于 2014-12-10T11:47:16.467 回答