3

Reactor 错误处理文档 ( https://projectreactor.io/docs/core/3.4.10/reference/index.html#error.handling ) 指出错误处理运算符不会让原始序列继续。

在学习错误处理运算符之前,您必须记住,反应序列中的任何错误都是终端事件。即使使用了错误处理运算符,它也不会让原始序列继续下去。相反,它将 onError 信号转换为新序列的开始(回退序列)。换句话说,它取代了它上游的终止序列。

但是 onErrorContinue 的 javadoc 声明如下(https://projectreactor.io/docs/core/3.4.10/api/index.html) -

通过从序列中删除犯罪元素并继续处理后续元素,让上游兼容的运算符从错误中恢复。

onErrorContinue 不被视为“错误处理运算符”吗?

它似乎确实允许原始序列继续 -

        Flux.range(1, 5)
                .map(i -> {
                    if (i == 3) {
                        throw new RuntimeException("Forcing exception for " + i);
                    }
                    return i;
                })
                .doOnNext(i -> System.out.println(i))
                .onErrorContinue((throwable, o) -> System.err.println("Error while processing " + o + " - " + throwable.getMessage()))
                .subscribe();

结果(删除了 3 个但继续使用后续元素)

1
2
4
5
Error while processing 3 - Forcing exception for 3

Process finished with exit code 0

文档确实指出 onErrorContinue 依赖于运营商的支持。有没有其他方法可以让原始序列(源 Flux)继续适用于所有操作员?如果出现错误(onErrorResume 行为),我不希望使用备用助焊剂来替换我的源助焊剂 - 我只想忽略问题元素并继续使用源助焊剂。

编辑 1(我的用例)

我有一个反应堆 kafka 源通量 & 我想无限地消耗它,不管错误如何。我使用的是 onErrorContinue,但根据在这篇文章中收到的反馈,我已将其替换为 onErrorResume。下面是我目前拥有的代码,但我不确定它是否在所有情况下都有效(通过“工作”,我不断地从 kafka 流式传输,不管任何错误)。请问有什么建议吗?

        KafkaReceiver.create(receiverOptions)
                .receive()
                .flatMap(record -> processRequest(record.value())
                        .doOnNext(e -> record.receiverOffset().acknowledge())
                        .doOnError(e -> {
                            System.err.println("Error occurred for msg: " + record.value() + ", Error " + e);
                            record.receiverOffset().acknowledge();
                        })
                        .onErrorResume(e -> Mono.empty()))
                .repeat(() -> true)
                .retryWhen(Retry.indefinitely())
                .doFinally(signalType -> {
                    //dont expect control to ever reach here
                    System.err.println("KafkaReceiverFlux terminating with Signal type: " + signalType);
                })
                .subscribe();
4

1 回答 1

2

反应器遵循的反应流规范声明流中的所有错误都是终端事件 - 这就是反应器错误处理文档的基础。为了处理错误,必须发生错误,并且根据规范,该错误必须是终端错误。在所有符合规范的案例(几乎是所有案例)中都是如此。

onErrorContinue()然而,它是一种相当特殊的运算符。它是一种错误处理运算符,但它通过允许删除错误和继续流来破坏反应规范。在您希望连续处理、永不停止、带有错误旁道的情况下,它可能很有用。

话虽如此,它有很多问题——不仅是它需要特定的运营商支持(因为完全符合反应流规范的运营商可能会完全无视onErrorContinue()但仍然保持合规),还有一大堆其他问题。如果您对一些背景阅读感兴趣,我们中的一些人会在此处讨论这些内容。将来它可能会被转移到一个unsafe()分组或类似的地方,但这是一个非常难以解决的问题。

话虽如此,核心建议是目前在Javadoc 中不是在所有情况下都使用onErrorContinue(),而是在非常具体的情况下使用,而是onErrorResume()在每个单独的发布者上使用:

//Stream
.flatMap(id -> repository.retrieveById(id)
      .doOnError(System.err::println)
      .onErrorResume(e -> Mono.empty()))

这会带来更大的冗长性,并且可能会带来很小的性能损失(我没有验证过),但其优点是其行为更加清晰,不会破坏反应流规范,并且不需要特定的操作员支持即可工作。这是我在几乎所有情况下都推荐的——我个人觉得onErrorContinue()在大多数情况下,它的微妙之处太复杂了,无法推理。

于 2021-10-09T23:34:07.887 回答