2

我将 Spring Reactor 与 Spring Cloud Stream (GCP Pub/Sub Binder) 一起使用并遇到错误处理问题。我可以通过一个非常简单的示例重现该问题:

@Bean
public Function<Flux<String>, Mono<Void>> consumer() {
    return flux -> flux
        .doOnNext(msg -> log.info("New message received: {}", msg))
        .map(msg -> {
            if (true) { 
                throw new RuntimeException("exception encountered!");
            }
            return msg;
        })
        .doOnError(throwable -> log.error("Failed to consume message", throwable))
        .then();
}

我期望的行为是看到“无法使用消息”打印,但是,这似乎不是发生的事情。在向我看到/信号的链添加.log()调用时,我希望看到信号。onNextonCompleteonError

我的实际代码如下所示:

@Bean
public Function<Flux<CustomMessage>, Mono<Void>> consumer(MyService myService) {
    return flux -> flux
        .doOnNext(msg -> log.info("New message received: {}", msg))
        .flatMap(myService::processMessage) // exception happens deep in here
        .doOnError(throwable -> log.error("Failed to consume message", throwable))
        .then();
}

我注意到在我的服务类的深处,我试图对我的 Reactor 发布者进行错误处理。但是,onError使用 Spring Cloud Stream 时不会出现该信号。如果我只是myService.processMessage(msg)在单元测试中调用我的服务并模拟异常,我的反应链将正确传播错误信号。

当我连接到 Spring Cloud Stream 时,这似乎是一个问题。我想知道 Spring Cloud Function/Stream 是否正在执行任何全局错误包装?

在我的非平凡代码中,我确实注意到此错误消息可能与为什么我没有收到错误信号有关?

ERROR --- onfiguration$FunctionToDestinationBinder : Failed to process the following content which will be dropped: ...

onError为了进一步混淆,如果我将 Spring Cloud Stream 绑定切换到非反应式实现,我能够在反应链中获得信号,如下所示:

@Bean
public Consumer<CustomMessage> consumer(MyService myService) {
    return customMessage -> Mono.just(customMessage)
        .doOnNext(msg -> log.info("New message received: {}", msg))
        .flatMap(myService::processMessage) // exception happens deep in here
        .doOnError(throwable -> log.error("Failed to consume message", throwable)) // prints successfully this time
        .subscribe();
}
4

2 回答 2

2

所以这是我从自己的调查中收集到的,也许这可能对其他人有所帮助。预先警告,我可能没有使用正确的“Spring Reactor Language”,但这就是我最终解决它的方式......

Hoxton.SR5中,an包含在管理通量订阅onErrorContinue的反应绑定中。问题onErrorContinue在于它通过在失败的运营商处应用 BiConsumer 函数来影响上游运营商(如果支持)。

这意味着当map/flatMap操作符发生错误时,onErrorContinueBiConsumer 将启动并将下游信号修改为onComplete()( Mono<T>) 或request(...)(如果它从 a 请求新元素Flux<T>)。这导致我们的doOnError(...)操作员没有执行,因为没有onError()信号。

最终,SCS 团队决定删除这个错误处理包装器Hoxton.SR6不再有这个onErrorContinue。但是,这意味着传播到 SCS 绑定的异常将导致 Flux 订阅被切断。由于没有订阅者,后续消息将无处可路由。

此错误处理已传递给客户端,我们向内部发布onErrorResume者添加了一个操作符以有效地丢弃错误信号。当发布者中遇到错误时,会将发布者切换到作为参数传入的后备发布者,并从运营商链中的该点恢复。在我们的例子中,这个后备发布者只是简单地返回,它允许我们丢弃错误信号,同时仍然允许内部错误处理机制运行,同时也不影响外部源发布者。myService::processMessageonErrorResumeMono.empty()

onErrorResume示例/说明

上述技术可以用一个非常简单的例子来说明。

Flux.just(1, 2, 3)
    .flatMap(i -> i == 2
        ? Mono.error(new RuntimeException("error")
        : Mono.just(i))
    .onErrorResume(t -> Flux.just(4, 5, 6))
    .doOnNext(i -> log.info("Element: {}", i))
    .subscribe();

以上将Flux<Integer>输出以下内容:

Element: 1
Element: 4
Element: 5
Element: 6

由于在 element 遇到错误2onErrorResume回退开始,新发布者Flux.just(4, 5, 6)有效地从回退中恢复。在我们的例子中,我们不想影响源发布者(即Flux.just(1, 2, 3))。我们只想删除错误的元素 ( 2) 并继续下一个元素 ( 3)。

我们不能简单地改成Flux.just(4, 5, 6)这样:Flux.empty()Mono.empty()

Flux.just(1, 2, 3)
    .flatMap(i -> i == 2
        ? Mono.error(new RuntimeException("error")
        : Mono.just(i))
    .onErrorResume(t -> Mono.empty())
    .doOnNext(i -> log.info("Element: {}", i))
    .subscribe();

这将导致输出以下内容:

Element: 1

这是因为onErrorResume已将上游发布者替换为后备发布者(即Mono.empty())并从那时起恢复

为了实现我们想要的输出:

Element: 1
Element: 3

我们必须将onErrorResume操作符放在 的内部发布者上flatMap

public Mono<Integer> func(int i) {
    return i = 2 ? Mono.error(new RuntimeException("error")) : Mono.just(i);
}

Flux.just(1, 2, 3)
    .flatMap(i -> func(i)
        onErrorResume(t -> Mono.empty()))
    .doOnNext(i -> log.info("Element: {}", i))
    .subscribe();

现在,onErrorResume内部发布者返回的唯一效果是func(i). 如果 中的操作员发生错误func(i)onErrorResume将回退到Mono.empty()有效地完成而Mono<T>不会炸毁。这也仍然允许在回退运行之前应用内部doOnError的错误处理运算符(例如) 。这是因为,与 不同的是,它不会影响上游运算符并更改错误位置处的下一个信号。 func(i)onErrorContinue

最终解决方案

在我的问题中重用代码片段,我已将我的 Spring Cloud 版本升级到Hoxton.SR6并将代码更改为如下所示:

@Bean
public Function<Flux<CustomMessage>, Mono<Void>> consumer(MyService myService) {
    return flux -> flux
        .doOnNext(msg -> log.info("New message received: {}", msg))
        .flatMap(msg -> myService.processMessage(msg)
            .onErrorResume(throwable -> Mono.empty())
        )
        .then();
}

请注意,onErrorResume位于内部发布者(在 内flatMap)。

于 2020-10-06T15:18:26.773 回答
0

我认为问题存在于以下代码中:

    .map(msg -> new RuntimeException("exception encountered!"))

地图行中的 lambda 返回异常,而不是抛出异常。

于 2020-10-01T21:45:37.403 回答