我需要用Java编写一个冒烟测试来验证系统是否连接到kafka,
有人有什么主意吗?我找到了这篇文章:
但是从Java代码做起来太复杂了,我不认为这是我应该使用的方向。
提前致谢。
我需要用Java编写一个冒烟测试来验证系统是否连接到kafka,
有人有什么主意吗?我找到了这篇文章:
但是从Java代码做起来太复杂了,我不认为这是我应该使用的方向。
提前致谢。
I had the same question and I don't want to leave this question without any answer. I read a lot about how I can check the connection and most of the answers I found was checking the connection with Zk, but I really want to check the connection directly with Kafka server.
What I did is to create a simple KafkaConsumer and list all the topics with listTopics(). If the connection is success, then you will get something as a return. Otherwise, you will get a TimeoutException
.
def validateKafkaConnection(kafkaParams : mutable.Map[String, Object]) : Unit = {
val props = new Properties()
props.put("bootstrap.servers", kafkaParams.get("bootstrap.servers").get.toString)
props.put("group.id", kafkaParams.get("group.id").get.toString)
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer")
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer")
val simpleConsumer = new KafkaConsumer[String, String](props)
simpleConsumer.listTopics()
}
then you can wrap this method in a try-catch
sentence to catch the exception.
您可以使用以下命令检查服务器是否正在运行:
ZkClient zkClient = new ZkClient("your_zookeeper_server", 5000 /* ZOOKEEPER_SESSION_TIMEOUT */, 5000 /* ZOOKEEPER_CONNECTION_TIMEOUT */, ZKStringSerializer$.MODULE$);
List<Broker> brokers = scala.collection.JavaConversions.seqAsJavaList(zkUtils.getAllBrokersInCluster());
if (brokers.isEmpty()) {
// No brokers available
} else {
// There are brokers available
}