0

这是测试代码

    final Flowable<Integer> f1 = Flowable.fromPublisher(s -> {
        s.onNext(Integer.valueOf(1));
        s.onComplete();
    });


    final Flowable<Integer> f2 = Flowable.fromPublisher(s -> {
        s.onNext(Integer.valueOf(2));
        s.onComplete();
    });

    Flowable.zip(f1, f2, (i1, i2) -> "" + i1 + i2)
            .blockingSubscribe(System.out::println);

它会得到

Exception in thread "main" java.lang.NullPointerException
    at io.reactivex.internal.operators.flowable.FlowableZip$ZipSubscriber.onNext(FlowableZip.java:386)

我不明白为什么?

如果我像这样更新代码

    final Flowable<Integer> f1 = Flowable.<Integer>fromPublisher(s -> {
        s.onNext(Integer.valueOf(1));
        s.onComplete();
    }).onErrorResumeNext(Flowable.empty());


    final Flowable<Integer> f2 = Flowable.<Integer>fromPublisher(s -> {
        s.onNext(Integer.valueOf(2));
        s.onComplete();
    }).onErrorResumeNext(Flowable.empty());

    Flowable.zip(f1, f2, (i1, i2) -> "" + i1 + i2)
            .blockingSubscribe(System.out::println);

它将按预期打印 12 。但为什么?这没有意义。

4

1 回答 1

1

问题是您违反了Publisher<T>使用fromPublisher.

Reactive Streams发布者需要以合同中规定的非常具体的方式行事。该行为包括Subscriber.onSubscribe()在进行任何其他调用之前调用并尊重该订阅者的背压。

因为您不调用onSubscribe内部queue变量永远不会被初始化,并且queue.offer对其onNext方法的调用会导致 NPE。

据推测,通过使用onErrorResumeNext该实现可确保正确调用所有内容,“修复”无效状态。

要解决您的问题,有两种可能性:

  1. 不要使用Flowable.fromPublisher. 它旨在与 Reactive Streams 宣言的其他实现相衔接,并且没有任何保护措施。而是使用Flowable.create正确处理初始化和背压的方法。
  2. 使用非背压感知Observable,因为您的用例似乎并不关心背压。再次使用该Observable.create方法安全使用。
于 2016-12-10T02:41:35.713 回答