Reactor 错误处理文档 ( https://projectreactor.io/docs/core/3.4.10/reference/index.html#error.handling ) 指出错误处理运算符不会让原始序列继续。
在学习错误处理运算符之前,您必须记住,反应序列中的任何错误都是终端事件。即使使用了错误处理运算符,它也不会让原始序列继续下去。相反,它将 onError 信号转换为新序列的开始(回退序列)。换句话说,它取代了它上游的终止序列。
但是 onErrorContinue 的 javadoc 声明如下(https://projectreactor.io/docs/core/3.4.10/api/index.html) -
通过从序列中删除犯罪元素并继续处理后续元素,让上游兼容的运算符从错误中恢复。
onErrorContinue 不被视为“错误处理运算符”吗?
它似乎确实允许原始序列继续 -
Flux.range(1, 5)
.map(i -> {
if (i == 3) {
throw new RuntimeException("Forcing exception for " + i);
}
return i;
})
.doOnNext(i -> System.out.println(i))
.onErrorContinue((throwable, o) -> System.err.println("Error while processing " + o + " - " + throwable.getMessage()))
.subscribe();
结果(删除了 3 个但继续使用后续元素)
1
2
4
5
Error while processing 3 - Forcing exception for 3
Process finished with exit code 0
文档确实指出 onErrorContinue 依赖于运营商的支持。有没有其他方法可以让原始序列(源 Flux)继续适用于所有操作员?如果出现错误(onErrorResume 行为),我不希望使用备用助焊剂来替换我的源助焊剂 - 我只想忽略问题元素并继续使用源助焊剂。
编辑 1(我的用例)
我有一个反应堆 kafka 源通量 & 我想无限地消耗它,不管错误如何。我使用的是 onErrorContinue,但根据在这篇文章中收到的反馈,我已将其替换为 onErrorResume。下面是我目前拥有的代码,但我不确定它是否在所有情况下都有效(通过“工作”,我不断地从 kafka 流式传输,不管任何错误)。请问有什么建议吗?
KafkaReceiver.create(receiverOptions)
.receive()
.flatMap(record -> processRequest(record.value())
.doOnNext(e -> record.receiverOffset().acknowledge())
.doOnError(e -> {
System.err.println("Error occurred for msg: " + record.value() + ", Error " + e);
record.receiverOffset().acknowledge();
})
.onErrorResume(e -> Mono.empty()))
.repeat(() -> true)
.retryWhen(Retry.indefinitely())
.doFinally(signalType -> {
//dont expect control to ever reach here
System.err.println("KafkaReceiverFlux terminating with Signal type: " + signalType);
})
.subscribe();