2

当多个onErrorContinue添加到管道以处理从flatMap抛出的特定类型的异常时,异常处理无法按预期工作。

我希望下面的代码应该删除元素 1 到 6,并且订阅者应该使用元素 7 到 10。

public class FlatMapOnErrorContinueExample {
    public static void main(String[] args) {
        Flux.just(1, 2, 3, 4, 5, 6, 7, 8, 9, 10)
                .flatMap(number -> {
                    if (number <= 3) {
                        return Mono.error(new NumberLesserThanThree("Number is lesser than 3"));
                    } else if (number > 3 && number <= 6) {
                        return Mono.error(new NumberLesserThanSixButGretherThan3("Number is grether than 6"));
                    } else {
                        return Mono.just(number);
                    }
                })
                .onErrorContinue(NumberLesserThanThree.class,
                        (throwable, object) -> System.err.println("Exception: Dropping the element because it is lesser than 3"))

                .onErrorContinue(NumberLesserThanSixButGretherThan3.class,
                        (throwable, object) -> System.err.println("Exception: Dropping the element because it is lesser than 6 but grether than 3"))

                .onErrorContinue((throwable, object) ->
                        System.err.println("Exception: " + throwable.getMessage()))

                .subscribe(number -> System.out.println("number is " + number),
                        error -> System.err.println("Exception in Subscription " + error.getMessage()));
    }

    public static class NumberLesserThanThree extends RuntimeException {
        public NumberLesserThanThree(final String msg) {
            super(msg);
        }
    }

    public static class NumberLesserThanSixButGretherThan3 extends RuntimeException {
        public NumberLesserThanSixButGretherThan3(final String msg) {
            super(msg);
        }
    }
}

这是我得到的输出:

Exception: Dropping the element because it is lesser than 3
Exception: Dropping the element because it is lesser than 3
Exception: Dropping the element because it is lesser than 3
Exception in Subscription Number is grether than 6

问题:为什么第二个onErrorContinue没有被调用,但是异常发送给了订阅者?

附加说明: 如果我删除 1st 和 2nd onErrorContinue,那么所有异常都由 3rd 处理onErrorContinue。我可以使用这种方法来接收所有异常并检查异常类型并继续处理。但是,我想让它更清晰的异常处理而不是添加if..else块。

这个问题与为什么 Thread.sleep() 会触发对 Flux.interval() 的订阅有何不同?

1)这个关于异常处理和异常处理顺序的问题;另一个问题是关于并行处理元素并使主线程等待所有元素处理完成 3)这个问题对线程没有任何关注,即使 add Thread.sleep(10000)after . subscribe,行为也没有变化。

4

1 回答 1

2

这又归结为onErrorContinue. 它打破了规则,因为它不会“捕获”错误,然后因此改变下游的行为,它实际上允许支持操作员“向前看”并做出相应的行为,从而改变上游的结果。

这很奇怪,并导致一些不是立即显而易见的行为,例如这里的情况。据我所知,所有支持运算符只向前看下一个 onErrorContinue运算符,而不是递归搜索所有此类运算符。相反,他们将评估下一个谓词onErrorContinue(在这种情况下,它是否属于某种类型),然后做出相应的行为——如果谓词返回 true,则调用处理程序,否则将错误抛出下游。(在匹配谓词之前,它不会移动到下一个 onErrorContinue运算符,然后是下一个运算符。)

显然这是一个人为的例子——但由于这些特质,我几乎总是建议避免使用onErrorContinue. flatMap()在涉及的地方有两种“正常”的方式:

  1. 如果flatMap()其中有一个“内部反应链”,那就是它调用另一个方法或一系列返回发布者的方法 - 然后只需onErrorResume()flatMap()调用结束时使用来处理这些错误。您可以链接onErrorResume(),因为它适用于下游,而不是上游运营商。这是迄今为止最常见的情况。

  2. IfflatMap()是 if / else 的命令式集合,它返回不同的发布者,例如它在这里,并且您希望/必须保持命令式样式,抛出异常而不是 using Mono.error(),并在适当Mono.empty()的情况下捕获,以防出错:

    .flatMap(number -> {
        try {
            if (number <= 3) {
                throw new NumberLessThanThree();
            } else if (number <= 6) {
                throw new NumberLessThanSixButGreaterThan3();
            } else {
                return Mono.just(number);
            }
        }
        catch(NumberLessThanThree ex) {
            //Handle it
            return Mono.empty();
        }
        catch(NumberLessThanSixButGreaterThan3 ex) {
            //As above
        }
    })

一般来说,使用这两种方法中的一种可以容易地推断正在发生的事情。

(为了在阅读评论后保持完整性 - 这与反应链在主线程退出之前无法完成无关。)

于 2020-06-05T10:32:13.757 回答