1

我有三个整数观察者,如下所示:

第一观察员:

 private Observer<Integer> getFirstObserver() {
        return new Observer<Integer>() {

            @Override
            public void onSubscribe(Disposable d) {
            }

            @Override
            public void onNext(Integer integer) {
                Log.d(LOG_TAG, "onNext First " + integer);

            }

            @Override
            public void onError(Throwable e) {
            }

            @Override
            public void onComplete() {

            }
        };
    }

第二观察员:

private Observer<Integer> getSecondObserver() {
        return new Observer<Integer>() {

            @Override
            public void onSubscribe(Disposable d) {
            }

            @Override
            public void onNext(Integer integer) {
                Log.d(LOG_TAG, "onNext Second " + integer);

            }

            @Override
            public void onError(Throwable e) {
            }

            @Override
            public void onComplete() {
            }
        };
    }

第三观察者:

    private Observer<Integer> getThirdObserver() {
    return new Observer<Integer>() {

        @Override
        public void onSubscribe(Disposable d) {
        }

        @Override
        public void onNext(Integer integer) {
            Log.d(LOG_TAG, "onNext Third " + integer);

        }

        @Override
        public void onError(Throwable e) {
        }

        @Override
        public void onComplete() {
        }
    };
}

现在,如果我喜欢下面的代码:

    void asyncSubjectDemo1() {
        Observable<Integer> observable = Observable.just(1, 2, 3, 4);
        PublishSubject<Integer> asyncSubject = PublishSubject.create();
        observable.subscribe(asyncSubject);
        asyncSubject.subscribe(getFirstObserver());
        asyncSubject.subscribe(getSecondObserver());
        asyncSubject.subscribe(getThirdObserver());

    }

Logcat 中没有按照文档中的预期打印任何内容

PublishSubject 仅向观察者发出那些在订阅时间之后由源 Observable(s) 发出的项目。

但是,如果我在创建 Observable 时添加 observable,如下所示并运行它

void asyncSubjectDemo1() {
            Observable<Integer> observable = Observable.just(1, 2, 3, 4).observeOn(AndroidSchedulers.mainThread());
            PublishSubject<Integer> asyncSubject = PublishSubject.create();
            observable.subscribe(asyncSubject);
            asyncSubject.subscribe(getFirstObserver());
            asyncSubject.subscribe(getSecondObserver());
            asyncSubject.subscribe(getThirdObserver());

        }

以下是输出

D/MY_LOG: onNext First 1
D/MY_LOG: onNext Second 1
D/MY_LOG: onNext Third 1
D/MY_LOG: onNext First 2
D/MY_LOG: onNext Second 2
D/MY_LOG: onNext Third 2

为什么在这种情况下有任何歧义?

4

1 回答 1

1

请阅读 PublishSubject 的 Javadoc:http: //reactivex.io/RxJava/3.x/javadoc/io/reactivex/rxjava3/subjects/PublishSubject.html

“一个 PublishSubject 不保留/缓存项目,因此,一个新的观察者不会收到任何过去的项目。”

在第一种情况下,您订阅了PublishSubject一个同步源,因此此时,所有项目都在执行之前完成asyncSubject.subscribe(getFirstObserver());

在第二种情况下,源现在已安排好,当您订阅PublishSubject它时,您会创建一个窗口或竞赛(取决于方法执行的位置),以便asyncSubject.subscribe(getFirstObserver());等有机会及时订阅主题并因此接收以后的项目。

于 2020-05-15T21:43:34.840 回答