我正在使用Kafka_2.12版本2.3.0。出于删除主题的目的,我正在使用以下代码片段
AdminClient adminClient = KafkaAdminClient.create(conf);
adminClient.deleteTopics(Collections.singleton(topic));
但是当执行上述代码时,我的 Kafka 服务器正在关闭。这是卡夫卡日志
控制器日志
[2019-11-06 18:36:45,151] DEBUG [Controller id=0] Delete topics listener fired for topics DemoTopic to be deleted (kafka.controller.KafkaController)
[2019-11-06 18:36:45,151] INFO [Controller id=0] Starting topic deletion for topics DemoTopic (kafka.controller.KafkaController)
[2019-11-06 18:36:45,151] INFO [Topic Deletion Manager 0] Handling deletion for topics DemoTopic (kafka.controller.TopicDeletionManager)
[2019-11-06 18:36:45,151] INFO [Topic Deletion Manager 0] Deletion of topic DemoTopic (re)started (kafka.controller.TopicDeletionManager)
[2019-11-06 18:36:45,151] INFO [Topic Deletion Manager 0] Topic deletion callback for DemoTopic (kafka.controller.TopicDeletionManager)
[2019-11-06 18:36:45,161] INFO [Topic Deletion Manager 0] Partition deletion callback for DemoTopic-0 (kafka.controller.TopicDeletionManager)
[2019-11-06 18:36:45,181] DEBUG The stop replica request (delete = false) sent to broker 0 is StopReplicaRequestInfo([Topic=DemoTopic,Partition=0,Replica=0],false) (kafka.controller.ControllerBrokerRequestBatch)
[2019-11-06 18:36:45,181] DEBUG [Topic Deletion Manager 0] Deletion started for replicas [Topic=DemoTopic,Partition=0,Replica=0] (kafka.controller.TopicDeletionManager)
[2019-11-06 18:36:45,191] DEBUG The stop replica request (delete = true) sent to broker 0 is StopReplicaRequestInfo([Topic=DemoTopic,Partition=0,Replica=0],true) (kafka.controller.ControllerBrokerRequestBatch)
[2019-11-06 18:36:45,241] DEBUG [Controller id=0] Delete topic callback invoked on StopReplica response received from broker 0: request error = NONE, partition errors = Map(DemoTopic-0 -> KAFKA_STORAGE_ERROR) (kafka.controller.KafkaController)
[2019-11-06 18:36:45,251] DEBUG [Topic Deletion Manager 0] Deletion failed for replicas [Topic=DemoTopic,Partition=0,Replica=0]. Halting deletion for topics Set(DemoTopic) (kafka.controller.TopicDeletionManager)
[2019-11-06 18:36:45,251] INFO [Topic Deletion Manager 0] Halted deletion of topics DemoTopic due to replica deletion failure (kafka.controller.TopicDeletionManager)
[2019-11-06 18:36:45,251] INFO [Topic Deletion Manager 0] Handling deletion for topics DemoTopic (kafka.controller.TopicDeletionManager)
[2019-11-06 18:36:45,251] INFO [Topic Deletion Manager 0] Retrying deletion of topic DemoTopic since replicas [Topic=DemoTopic,Partition=0,Replica=0] were not successfully deleted (kafka.controller.TopicDeletionManager)
[2019-11-06 18:36:45,261] DEBUG The stop replica request (delete = false) sent to broker 0 is StopReplicaRequestInfo([Topic=DemoTopic,Partition=0,Replica=0],false) (kafka.controller.ControllerBrokerRequestBatch)
服务器日志
[2019-11-06 18:36:45,171] INFO [GroupCoordinator 0]: Removed 0 offsets associated with deleted partitions: DemoTopic-0. (kafka.coordinator.group.GroupCoordinator)
[2019-11-06 18:36:45,191] INFO [ReplicaFetcherManager on broker 0] Removed fetcher for partitions Set(DemoTopic-0) (kafka.server.ReplicaFetcherManager)
[2019-11-06 18:36:45,191] INFO [ReplicaAlterLogDirsManager on broker 0] Removed fetcher for partitions Set(DemoTopic-0) (kafka.server.ReplicaAlterLogDirsManager)
[2019-11-06 18:36:45,191] INFO [ReplicaFetcherManager on broker 0] Removed fetcher for partitions Set(DemoTopic-0) (kafka.server.ReplicaFetcherManager)
[2019-11-06 18:36:45,191] INFO [ReplicaAlterLogDirsManager on broker 0] Removed fetcher for partitions Set(DemoTopic-0) (kafka.server.ReplicaAlterLogDirsManager)
[2019-11-06 18:36:45,221] ERROR Error while renaming dir for DemoTopic-0 in log dir C:\tmp\kafka-logs (kafka.server.LogDirFailureChannel)
java.nio.file.AccessDeniedException: C:\tmp\kafka-logs\DemoTopic-0 -> C:\tmp\kafka-logs\DemoTopic-0.28df372a01d341ef9db641185dfe97e0-delete
at sun.nio.fs.WindowsException.translateToIOException(WindowsException.java:83)
at sun.nio.fs.WindowsException.rethrowAsIOException(WindowsException.java:97)
at sun.nio.fs.WindowsFileCopy.move(WindowsFileCopy.java:387)
at sun.nio.fs.WindowsFileSystemProvider.move(WindowsFileSystemProvider.java:287)
at java.nio.file.Files.move(Files.java:1395)
at org.apache.kafka.common.utils.Utils.atomicMoveWithFallback(Utils.java:815)
at kafka.log.Log.$anonfun$renameDir$2(Log.scala:784)
at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
at kafka.log.Log.maybeHandleIOException(Log.scala:2065)
at kafka.log.Log.renameDir(Log.scala:782)
at kafka.log.LogManager.asyncDelete(LogManager.scala:858)
at kafka.cluster.Partition.$anonfun$delete$1(Partition.scala:374)
at kafka.utils.CoreUtils$.inLock(CoreUtils.scala:253)
at kafka.utils.CoreUtils$.inWriteLock(CoreUtils.scala:261)
at kafka.cluster.Partition.delete(Partition.scala:368)
at kafka.server.ReplicaManager.stopReplica(ReplicaManager.scala:368)
at kafka.server.ReplicaManager.$anonfun$stopReplicas$2(ReplicaManager.scala:398)
at scala.collection.immutable.Set$Set1.foreach(Set.scala:97)
at kafka.server.ReplicaManager.stopReplicas(ReplicaManager.scala:396)
at kafka.server.KafkaApis.handleStopReplicaRequest(KafkaApis.scala:219)
at kafka.server.KafkaApis.handle(KafkaApis.scala:118)
at kafka.server.KafkaRequestHandler.run(KafkaRequestHandler.scala:69)
at java.lang.Thread.run(Thread.java:745)
Suppressed: java.nio.file.AccessDeniedException: C:\tmp\kafka-logs\DemoTopic-0 -> C:\tmp\kafka-logs\DemoTopic-0.28df372a01d341ef9db641185dfe97e0-delete
at sun.nio.fs.WindowsException.translateToIOException(WindowsException.java:83)
at sun.nio.fs.WindowsException.rethrowAsIOException(WindowsException.java:97)
at sun.nio.fs.WindowsFileCopy.move(WindowsFileCopy.java:301)
at sun.nio.fs.WindowsFileSystemProvider.move(WindowsFileSystemProvider.java:287)
at java.nio.file.Files.move(Files.java:1395)
at org.apache.kafka.common.utils.Utils.atomicMoveWithFallback(Utils.java:812)
... 17 more
[2019-11-06 18:36:45,241] INFO [ReplicaManager broker=0] Stopping serving replicas in dir C:\tmp\kafka-logs (kafka.server.ReplicaManager)
[2019-11-06 18:36:45,241] INFO [ReplicaFetcherManager on broker 0] Removed fetcher for partitions Set(__consumer_offsets-22, __consumer_offsets-30, __consumer_offsets-8, __consumer_offsets-21, __consumer_offsets-4, __consumer_offsets-27, __consumer_offsets-7, __consumer_offsets-9, __consumer_offsets-46, __consumer_offsets-25, __consumer_offsets-35, __consumer_offsets-41, __consumer_offsets-33, __consumer_offsets-23, __consumer_offsets-49, __consumer_offsets-47, __consumer_offsets-16, __consumer_offsets-28, __consumer_offsets-31, __consumer_offsets-36, __consumer_offsets-42, __consumer_offsets-3, __consumer_offsets-18, __consumer_offsets-37, __consumer_offsets-15, __consumer_offsets-24, __consumer_offsets-38, __consumer_offsets-17, __consumer_offsets-48, __consumer_offsets-19, __consumer_offsets-11, __consumer_offsets-13, __consumer_offsets-2, __consumer_offsets-43, __consumer_offsets-6, __consumer_offsets-14, __consumer_offsets-20, __consumer_offsets-0, __consumer_offsets-44, __consumer_offsets-39, __consumer_offsets-12, __consumer_offsets-45, __consumer_offsets-1, __consumer_offsets-5, __consumer_offsets-26, __consumer_offsets-29, __consumer_offsets-34, __consumer_offsets-10, __consumer_offsets-32, __consumer_offsets-40) (kafka.server.ReplicaFetcherManager)
[2019-11-06 18:36:45,251] INFO [ReplicaAlterLogDirsManager on broker 0] Removed fetcher for partitions Set(__consumer_offsets-22, __consumer_offsets-30, __consumer_offsets-8, __consumer_offsets-21, __consumer_offsets-4, __consumer_offsets-27, __consumer_offsets-7, __consumer_offsets-9, __consumer_offsets-46, __consumer_offsets-25, __consumer_offsets-35, __consumer_offsets-41, __consumer_offsets-33, __consumer_offsets-23, __consumer_offsets-49, __consumer_offsets-47, __consumer_offsets-16, __consumer_offsets-28, __consumer_offsets-31, __consumer_offsets-36, __consumer_offsets-42, __consumer_offsets-3, __consumer_offsets-18, __consumer_offsets-37, __consumer_offsets-15, __consumer_offsets-24, __consumer_offsets-38, __consumer_offsets-17, __consumer_offsets-48, __consumer_offsets-19, __consumer_offsets-11, __consumer_offsets-13, __consumer_offsets-2, __consumer_offsets-43, __consumer_offsets-6, __consumer_offsets-14, __consumer_offsets-20, __consumer_offsets-0, __consumer_offsets-44, __consumer_offsets-39, __consumer_offsets-12, __consumer_offsets-45, __consumer_offsets-1, __consumer_offsets-5, __consumer_offsets-26, __consumer_offsets-29, __consumer_offsets-34, __consumer_offsets-10, __consumer_offsets-32, __consumer_offsets-40) (kafka.server.ReplicaAlterLogDirsManager)
[2019-11-06 18:36:45,311] INFO [ReplicaManager broker=0] Broker 0 stopped fetcher for partitions __consumer_offsets-22,__consumer_offsets-30,__consumer_offsets-8,__consumer_offsets-21,__consumer_offsets-4,__consumer_offsets-27,__consumer_offsets-7,__consumer_offsets-9,__consumer_offsets-46,__consumer_offsets-25,__consumer_offsets-35,__consumer_offsets-41,__consumer_offsets-33,__consumer_offsets-23,__consumer_offsets-49,__consumer_offsets-47,__consumer_offsets-16,__consumer_offsets-28,__consumer_offsets-31,__consumer_offsets-36,__consumer_offsets-42,__consumer_offsets-3,__consumer_offsets-18,__consumer_offsets-37,__consumer_offsets-15,__consumer_offsets-24,__consumer_offsets-38,__consumer_offsets-17,__consumer_offsets-48,__consumer_offsets-19,__consumer_offsets-11,__consumer_offsets-13,__consumer_offsets-2,__consumer_offsets-43,__consumer_offsets-6,__consumer_offsets-14,__consumer_offsets-20,__consumer_offsets-0,__consumer_offsets-44,__consumer_offsets-39,__consumer_offsets-12,__consumer_offsets-45,__consumer_offsets-1,__consumer_offsets-5,__consumer_offsets-26,__consumer_offsets-29,__consumer_offsets-34,__consumer_offsets-10,__consumer_offsets-32,__consumer_offsets-40 and stopped moving logs for partitions because they are in the failed log directory C:\tmp\kafka-logs. (kafka.server.ReplicaManager)
[2019-11-06 18:36:45,311] INFO [ReplicaFetcherManager on broker 0] Removed fetcher for partitions Set(DemoTopic-0) (kafka.server.ReplicaFetcherManager)
[2019-11-06 18:36:45,311] INFO Stopping serving logs in dir C:\tmp\kafka-logs (kafka.log.LogManager)
[2019-11-06 18:36:45,311] INFO [ReplicaAlterLogDirsManager on broker 0] Removed fetcher for partitions Set(DemoTopic-0) (kafka.server.ReplicaAlterLogDirsManager)
[2019-11-06 18:36:45,321] ERROR Shutdown broker because all log dirs in C:\tmp\kafka-logs have failed (kafka.log.LogManager)
[2019-11-06 18:36:45,651] WARN Exception causing close of session 0x100019523470000: An existing connection was forcibly closed by the remote host (org.apache.zookeeper.server.NIOServerCnxn)
[2019-11-06 18:36:45,651] INFO Closed socket connection for client /127.0.0.1:53650 which had sessionid 0x100019523470000 (org.apache.zookeeper.server.NIOServerCnxn)
[2019-11-06 18:36:52,241] INFO Expiring session 0x100019523470000, timeout of 6000ms exceeded (org.apache.zookeeper.server.ZooKeeperServer)
[2019-11-06 18:36:52,241] INFO Processed session termination for sessionid: 0x100019523470000 (org.apache.zookeeper.server.PrepRequestProcessor)
我缺少一些其他步骤/配置吗?