5

我正在尝试使用 Spring Cloud Stream 和 Kafka 集成创建一个 Spring Boot 应用程序。我在 Kafka 中创建了一个带有 1 个分区的示例主题,并已从根据此处给出的方向创建的 Spring Boot 应用程序发布到该主题

http://docs.spring.io/spring-cloud-stream/docs/1.0.2.RELEASE/reference/htmlsingle/index.html

https://blog.codecentric.de/en/2016/04/event-driven-microservices-spring-cloud-stream/

Spring Boot 应用程序 -

@SpringBootApplication
public class MyApplication {

    private static final Log logger = LogFactory.getLog(MyApplication.class);

    public static void main(String[] args) {
        SpringApplication.run(MyApplication.class, args);
    }
}

卡夫卡生产者类

@Service
@EnableBinding(Source.class)
public class MyProducer {

    private static final Log logger = LogFactory.getLog(MyProducer.class);

    @Bean
    @InboundChannelAdapter(value = Source.OUTPUT, poller = @Poller(fixedDelay = "10000", maxMessagesPerPoll = "1"))
    public MessageSource<TimeInfo> timerMessageSource() {
        TimeInfo t = new TimeInfo(new Timestamp(new Date().getTime())+"","Label");
        MessageBuilder<TimeInfo> m = MessageBuilder.withPayload(t);
        return () -> m.build();
    }

    public static class TimeInfo{

        private String time;
        private String label;

        public TimeInfo(String time, String label) {
            super();
            this.time = time;
            this.label = label;
        }

        public String getTime() {
            return time;
        }

        public String getLabel() {
            return label;
        }

    }
}

除了我想处理异常时,一切都运行良好。

如果 Kafka 主题出现故障,我可以在应用程序的日志文件中看到 ConnectionRefused 异常,但内置的重试逻辑似乎是在不停地重试!

根本没有抛出异常供我处理和做进一步的异常处理。我已经阅读了上面 Spring Cloud Stream 文档中 Kafka 的 Producer 选项和 Binder 选项,我看不到任何自定义选项可以让我一直捕获这个异常。

我是 Spring Boot / Spring Cloud Stream / Spring Integration 的新手(这似乎是云流项目的底层实现)。

你们还有什么知道可以将此异常级联到我的 Spring Cloud Stream 应用程序的吗?

4

0 回答 0