我有这样的代码:
过程 X:
getLocationObservable() // ---> async operation that fetches the location.
// Once location is found(or failed to find) it sends it to this filter :
.filter(location -> {
--- Operation A ---
after finishing the operation A, I either return 'true' and continue
to the next observable which is a Retrofit server call, or simply
return 'false' and quit.
})
.flatMap(location -> getRetrofitServerCallObservable( location )
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread()))
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(
new Observer<MyCustomResponse>() {
@Override
public void onSubscribe(Disposable d) {
_disposable = d;
}
@Override
public void onNext(MyCustomResponse response) {
}
@Override
public void onError(Throwable e) {
}
@Override
public void onComplete() {
}
});
在 Location 类中是这样的:
private PublishSubject<Location> locationPublishSubject = PublishSubject.create();
public Observable<Location> getLocationObservable() {
return locationPublishSubject;
}
and then...
locationPublishSubject.onNext( foundLocation );
PublishSubject 的问题:
- 如果我打电话
locationPublishSubject.onError( new <some mock exception> )
--> 它会崩溃io.reactivex.exceptions.UndeliverableException
- 如果我在-->
locationPublishSubject.onComplete
之后调用,则不会发生。它直接跳到。onNext
onNext
onComplete