0

出于我自己的目的,如果 Kafka Consumer 无法连接到代理,我需要停止 Spring Boot 应用程序。我的意思是,当 Kafka Consumer 尝试池化消息时,我们可以看到以下日志:

[Consumer clientId=consumer-ddddd-1, groupId=ddddd] Bootstrap broker localhost:9094 (id: -1 rack: null) disconnected
[Consumer clientId=consumer-ddddd-1, groupId=ddddd] Connection to node -1 (localhost/127.0.0.1:9094) could not be established. Broker may not be available.

当主题或代理不可用时,这是标准行为。结果 - 应用程序不会停止。但是我需要。

我正在尝试添加以下属性,但它不起作用:

spring.kafka.consumer.fetch-max-wait=1000
spring.kafka.admin.fail-fast=true
spring.kafka.session.timeout.ms=1000

一般来说,我希望得到如下行为:如果消费者无法连接 - 关闭应用程序

  • Spring Boot 版本:2.3.8.RELEASE
  • 卡夫卡:spring-kafka-starter

Kafka 轮询示例:

consumer.poll(Duration.ofMinutes(5));
4

1 回答 1

0

作为其中一种方法 - 我们可以从上面的评论中使用。或者我刚刚根据以下代码创建了验证并且它有效

    public void validate() {
        try {
            consumer.listTopics(Duration.ofSeconds(10));
        } catch (TimeoutException e) {
            logger.error("Topics doesn't exist OR unavailable broker");
            System.exit(1);
        }
    }
    
于 2021-02-11T14:11:04.590 回答