0

我有两个 observables 并使用 concatDelayError 进行顺序处理。

我的问题是 onNext 和 onCompleted 在处理之前被提前调用。我如何知道所有处理都已使用 concatDelayError 完成?

伪代码:

public Observable<Integer> concat(){
     int x = 10;
     int y = 20;


        Observable obx = Observable.create(emitter -> {
                    try {
                        int x = doSomeThing();
                        emitter.onNext(x);
                        emitter.onCompleted();
                    } catch (SQLiteException e) {
                        emitter.onError(e);
                    }
                }, Emitter.BackpressureMode.BUFFER);   
        Observable oby = Observable.create(emitter -> {
                    try {
                        int y = doSomeThing();
                        emitter.onNext(y);
                        emitter.onCompleted();
                    } catch (SQLiteException e) {
                        emitter.onError(e);
                    }
                }, Emitter.BackpressureMode.BUFFER);       
        Observable concated =  Observable.concatDelayError(ob1,ob2)
                 .compose(applySchedulers())
                 .replay().autoConnect();  

  }

    //somewhere else

    concat().subscribe(mReplaySubject);


    //somewhere else

     mReplaySubject.subscribe(new Observer<Integer>() {
                        @Override
                        public void onCompleted() {
                            launchActivity(SplashActivity.this, HomeActivity.class);
                            SplashActivity.this.finish();
                        }    
                        @Override
                        public void onError(Throwable e) {
                            e.printStackTrace();
                        }    
                        @Override
                        public void onNext(Integer value) {

                        }
                    });

我正在使用带有 autoConnect() 的回复()并订阅回复主题,因为我需要共享单个订阅。

4

1 回答 1

1

您可以使用doOnSubscribebefore replay().autoConnect(),但您还需要一种方法来确保Observable最多设置一次返回:

final class OnceObservable {

    final AtomicReference<Observable<Integer>> ref = new AtomicReference<>();

    int x;
    int y;

    Observable<Integer> concat() {
        Observable<Integer> obs = ref.get();
        if (obs != null) {
            return;
        }

        obs = Observable.concatDelayError(
            Observable.fromCallable(() -> doSomething()),
            Observable.fromCallable(() -> doSomethingElse())
        )
        .doOnSubscribe(() -> {
            x = 10;
            y = 20;
        })
        .replay().autoConnect();

        if (!ref.compareAndSet(null, obs)) {
            obs = ref.get();
        }
        return obs;
    }
}
于 2018-01-10T12:02:51.300 回答