我将 Spring Reactor 与 Spring Cloud Stream (GCP Pub/Sub Binder) 一起使用并遇到错误处理问题。我可以通过一个非常简单的示例重现该问题:
@Bean
public Function<Flux<String>, Mono<Void>> consumer() {
return flux -> flux
.doOnNext(msg -> log.info("New message received: {}", msg))
.map(msg -> {
if (true) {
throw new RuntimeException("exception encountered!");
}
return msg;
})
.doOnError(throwable -> log.error("Failed to consume message", throwable))
.then();
}
我期望的行为是看到“无法使用消息”打印,但是,这似乎不是发生的事情。在向我看到/信号的链添加.log()
调用时,我希望看到信号。onNext
onComplete
onError
我的实际代码如下所示:
@Bean
public Function<Flux<CustomMessage>, Mono<Void>> consumer(MyService myService) {
return flux -> flux
.doOnNext(msg -> log.info("New message received: {}", msg))
.flatMap(myService::processMessage) // exception happens deep in here
.doOnError(throwable -> log.error("Failed to consume message", throwable))
.then();
}
我注意到在我的服务类的深处,我试图对我的 Reactor 发布者进行错误处理。但是,onError
使用 Spring Cloud Stream 时不会出现该信号。如果我只是myService.processMessage(msg)
在单元测试中调用我的服务并模拟异常,我的反应链将正确传播错误信号。
当我连接到 Spring Cloud Stream 时,这似乎是一个问题。我想知道 Spring Cloud Function/Stream 是否正在执行任何全局错误包装?
在我的非平凡代码中,我确实注意到此错误消息可能与为什么我没有收到错误信号有关?
ERROR --- onfiguration$FunctionToDestinationBinder : Failed to process the following content which will be dropped: ...
onError
为了进一步混淆,如果我将 Spring Cloud Stream 绑定切换到非反应式实现,我能够在反应链中获得信号,如下所示:
@Bean
public Consumer<CustomMessage> consumer(MyService myService) {
return customMessage -> Mono.just(customMessage)
.doOnNext(msg -> log.info("New message received: {}", msg))
.flatMap(myService::processMessage) // exception happens deep in here
.doOnError(throwable -> log.error("Failed to consume message", throwable)) // prints successfully this time
.subscribe();
}