我正在使用 Kafka v0.9.0.1 (Scala v2.11) 和com.101tec:zkclient
v0.7。我正在尝试使用AdminUtils
创建一个 kafka 主题。我的代码如下。
String zkServers = "node1:2181,node2:2181,node3:2181,node4:2181";
Integer sessionTimeout = (int)TimeUnit.SECONDS.toMillis(10L);
Integer connectionTimeout = (int)TimeUnit.SECONDS.toMillis(8L);
ZkSerializer zkSerializer = ZKStringSerializer$.MODULE$;
Boolean isSecureKafkaCluster = false;
String topic = "test";
Integer partitions = 1;
Integer replication = 3;
ZkClient zkClient = new ZkClient(zkServers, sessionTimeout, connectionTimeout, zkSerializer);
ZkUtils zkUtils = new ZkUtils(zkClient, new ZkConnection(zkServers), isSecureKafkaCluster)
if(!AdminUtils.topicExists(zkUtils, topic)) {
AdminUtils.createTopic(zkUtils, topic, partitions, replications, new Properties());
}
该主题实际上是通过以下命令验证创建的。
bin/kafka-topics.sh --describe --zookeeper node1:2181 --topic test
但是,输出并不像预期的那样。
主题:测试 PartitionCount:1 ReplicationFactor:1 配置: 主题:测试分区:0领导者:-1副本:4 Isr:
如果我使用脚本。
bin/kafka-topics.sh --create --zookeeper node1:2181 --replication-factor 3 --partitions 1 --topic topic1
然后我看到以下内容。
Topic:test1 PartitionCount:1 ReplicationFactor:3 Configs:
Topic: test1 Partition: 0 Leader: 2 Replicas: 2,3,4 Isr: 2
关于我做错了什么的任何想法?效果是,如果我使用 aProducer
向主题发送 a ProducerRecord
,则主题上不会显示任何内容。