3

我正在使用 Kafka v0.9.0.1 (Scala v2.11) 和com.101tec:zkclientv0.7。我正在尝试使用AdminUtils创建一个 kafka 主题。我的代码如下。

String zkServers = "node1:2181,node2:2181,node3:2181,node4:2181";
Integer sessionTimeout = (int)TimeUnit.SECONDS.toMillis(10L);
Integer connectionTimeout = (int)TimeUnit.SECONDS.toMillis(8L);
ZkSerializer zkSerializer = ZKStringSerializer$.MODULE$;
Boolean isSecureKafkaCluster = false;
String topic = "test";
Integer partitions = 1;
Integer replication = 3;

ZkClient zkClient = new ZkClient(zkServers, sessionTimeout, connectionTimeout, zkSerializer);
ZkUtils zkUtils = new ZkUtils(zkClient, new ZkConnection(zkServers), isSecureKafkaCluster)
if(!AdminUtils.topicExists(zkUtils, topic)) {
 AdminUtils.createTopic(zkUtils, topic, partitions, replications, new Properties());
}

该主题实际上是通过以下命令验证创建的。

bin/kafka-topics.sh --describe --zookeeper node1:2181 --topic test

但是,输出并不像预期的那样。

主题:测试 PartitionCount:1 ReplicationFactor:1 配置:
主题:测试分区:0领导者:-1副本:4 Isr:

如果我使用脚本。

bin/kafka-topics.sh --create --zookeeper node1:2181 --replication-factor 3 --partitions 1 --topic topic1

然后我看到以下内容。

Topic:test1 PartitionCount:1    ReplicationFactor:3 Configs:
Topic: test1    Partition: 0    Leader: 2   Replicas: 2,3,4 Isr: 2

关于我做错了什么的任何想法?效果是,如果我使用 aProducer向主题发送 a ProducerRecord,则主题上不会显示任何内容。

4

1 回答 1

4

我遇到过同样的问题。
解决方案:

  1. 清理 zk 元信息(/brokers/topic)

  2. 清除所有 /data 目录以删除属于该主题的所有主题分区文件夹

  3. 一次重新启动整个 kafka 集群所有代理。

  4. 重新创建该主题。

这解决了我的问题。而且我认为根本原因是 kafka 本身无法处理干净删除主题的缺陷(自 v1.0.0 以来已修复此问题)。

编辑:即使使用 Kafka(>= v1.0.0),如果您正在删除一个空主题或您的 kafka 集群处于极端负载下,有时删除主题也会卡住。
解决方案就像重新启动控制器代理一样简单。(你总是可以在 ZK: /controller by 下找到控制器代理get /controller)。所以只需重新启动一个代理而不是整个 kafka 集群。

于 2017-12-11T22:08:44.137 回答