0

我有一个在 Confluent Cloud 上运行的 Kafka 集群,但我无法从 UI 重置提交偏移量。因此,我试图通过 Kafka 的 CLI 进行如下操作:

kafka-consumer-groups --bootstrap-server=my_cluster.confluent.cloud:9092 --list

但是,我遇到了以下错误。我认为这与我如何进行身份验证有关。

Error: Executing consumer group command failed due to org.apache.kafka.common.KafkaException: Failed to find brokers to send ListGroups
java.util.concurrent.ExecutionException: org.apache.kafka.common.KafkaException: Failed to find brokers to send ListGroups
    at java.base/java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:396)
    at java.base/java.util.concurrent.CompletableFuture.get(CompletableFuture.java:2073)
    at org.apache.kafka.common.internals.KafkaFutureImpl.get(KafkaFutureImpl.java:165)
    at kafka.admin.ConsumerGroupCommand$ConsumerGroupService.listConsumerGroups(ConsumerGroupCommand.scala:203)
    at kafka.admin.ConsumerGroupCommand$ConsumerGroupService.listGroups(ConsumerGroupCommand.scala:198)
    at kafka.admin.ConsumerGroupCommand$.run(ConsumerGroupCommand.scala:70)
    at kafka.admin.ConsumerGroupCommand$.main(ConsumerGroupCommand.scala:59)
    at kafka.admin.ConsumerGroupCommand.main(ConsumerGroupCommand.scala)
Caused by: org.apache.kafka.common.KafkaException: Failed to find brokers to send ListGroups
    at org.apache.kafka.clients.admin.KafkaAdminClient$24.handleFailure(KafkaAdminClient.java:3368)
    at org.apache.kafka.clients.admin.KafkaAdminClient$Call.handleTimeoutFailure(KafkaAdminClient.java:838)
    at org.apache.kafka.clients.admin.KafkaAdminClient$Call.fail(KafkaAdminClient.java:804)
    at org.apache.kafka.clients.admin.KafkaAdminClient$TimeoutProcessor.handleTimeouts(KafkaAdminClient.java:934)
    at org.apache.kafka.clients.admin.KafkaAdminClient$AdminClientRunnable.timeoutPendingCalls(KafkaAdminClient.java:1013)
    at org.apache.kafka.clients.admin.KafkaAdminClient$AdminClientRunnable.processRequests(KafkaAdminClient.java:1367)
    at org.apache.kafka.clients.admin.KafkaAdminClient$AdminClientRunnable.run(KafkaAdminClient.java:1331)
    at java.base/java.lang.Thread.run(Thread.java:833)
Caused by: org.apache.kafka.common.errors.TimeoutException: Timed out waiting for a node assignment. Call: findAllBrokers
4

2 回答 2

2

这是列出消费者组的示例

kafka-consumer-groups --bootstrap-server <ccloud kafka>:9092 --command-config consumer.properties --list

消费者属性

bootstrap.servers=<ccloud kafka>:9092
ssl.endpoint.identification.algorithm=https
security.protocol=SASL_SSL
sasl.mechanism=PLAIN
sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule 
required username="<KEY>" password="<SECRET>";
于 2022-02-15T19:19:35.453 回答
1

您需要使用该--command-config选项来设置包含您的 CCLoud 凭据的属性文件

于 2022-02-14T18:19:44.203 回答