我正在尝试检查传递给我的“启动流”方法的主题是否是我的程序连接到的 Kafka 中有效/已经存在的主题。
我知道 Java 有 KafkaConsumer.ListTopics,但我正在使用 akka-kafka 库,而且 ConsumerSetting 似乎没有相同的方法。我可以使用 Kafka-topics --list 命令执行脚本以列出 kafka 主题的代码,但我更喜欢一种不那么笨拙的方式。
我正在尝试检查传递给我的“启动流”方法的主题是否是我的程序连接到的 Kafka 中有效/已经存在的主题。
我知道 Java 有 KafkaConsumer.ListTopics,但我正在使用 akka-kafka 库,而且 ConsumerSetting 似乎没有相同的方法。我可以使用 Kafka-topics --list 命令执行脚本以列出 kafka 主题的代码,但我更喜欢一种不那么笨拙的方式。
您必须从该 Settings 对象创建KafkaConsumer
,然后您可以使用您提到的 API 方法。
您不应该将 Zookeeper 直接暴露给未经身份验证的客户端。
最好的方法是从 zookeeper 获取主题列表,如下所示:
import org.apache.zookeeper.ZooKeeper;
val zk = new ZooKeeper("localhost:2181", 10000, null)
val topics = zk.getChildren("/brokers/topics", false)