当多个onErrorContinue添加到管道以处理从flatMap抛出的特定类型的异常时,异常处理无法按预期工作。
我希望下面的代码应该删除元素 1 到 6,并且订阅者应该使用元素 7 到 10。
public class FlatMapOnErrorContinueExample {
public static void main(String[] args) {
Flux.just(1, 2, 3, 4, 5, 6, 7, 8, 9, 10)
.flatMap(number -> {
if (number <= 3) {
return Mono.error(new NumberLesserThanThree("Number is lesser than 3"));
} else if (number > 3 && number <= 6) {
return Mono.error(new NumberLesserThanSixButGretherThan3("Number is grether than 6"));
} else {
return Mono.just(number);
}
})
.onErrorContinue(NumberLesserThanThree.class,
(throwable, object) -> System.err.println("Exception: Dropping the element because it is lesser than 3"))
.onErrorContinue(NumberLesserThanSixButGretherThan3.class,
(throwable, object) -> System.err.println("Exception: Dropping the element because it is lesser than 6 but grether than 3"))
.onErrorContinue((throwable, object) ->
System.err.println("Exception: " + throwable.getMessage()))
.subscribe(number -> System.out.println("number is " + number),
error -> System.err.println("Exception in Subscription " + error.getMessage()));
}
public static class NumberLesserThanThree extends RuntimeException {
public NumberLesserThanThree(final String msg) {
super(msg);
}
}
public static class NumberLesserThanSixButGretherThan3 extends RuntimeException {
public NumberLesserThanSixButGretherThan3(final String msg) {
super(msg);
}
}
}
这是我得到的输出:
Exception: Dropping the element because it is lesser than 3
Exception: Dropping the element because it is lesser than 3
Exception: Dropping the element because it is lesser than 3
Exception in Subscription Number is grether than 6
问题:为什么第二个onErrorContinue
没有被调用,但是异常发送给了订阅者?
附加说明:
如果我删除 1st 和 2nd onErrorContinue
,那么所有异常都由 3rd 处理onErrorContinue
。我可以使用这种方法来接收所有异常并检查异常类型并继续处理。但是,我想让它更清晰的异常处理而不是添加if..else
块。
这个问题与为什么 Thread.sleep() 会触发对 Flux.interval() 的订阅有何不同?
1)这个关于异常处理和异常处理顺序的问题;另一个问题是关于并行处理元素并使主线程等待所有元素处理完成 3)这个问题对线程没有任何关注,即使 add Thread.sleep(10000)
after . subscribe
,行为也没有变化。