我正在尝试实现 Redux 模式来管理 Jake Wharton 演示后的状态:http: //jakewharton.com/the-state-of-managing-state-with-rxjava/ 我希望流中的所有内容都运行在后台线程并在 AndroidMainThread 上接收输出。但是在当前设置下,我的订阅者会抛出一个异常,即我正在除 AndroidMainThread 之外的另一个线程上操作 UI。提前致谢。
Observable events = Observable.merge(Observable.just(new GetUsersEvent()/*, other event streams*/));
events.compose(mergeEvents(GetUsersEvent.class, /* other events */))
.compose(events -> events.map(event -> {
BaseAction action = null;
if (event instanceof GetUsersEvent)
action = new GetUsersAction();
return action;
})
.compose(actions -> actions.flatMap(action -> Observable.just(action)
.flatMap(action -> {
Observable result = Observable.empty();
if (action instanceof GetUsersAction)
result = userListVM.getUsers()
.subscribeOn(AndroidSchedulers.from(handlerThread.getLooper())); // I am using a handler thread to receive live updates from realm
return result;
})
.map(Result::successResult)
.onErrorReturn(Result::errorResult)
.startWith(Result.IN_FLIGHT)))
.scan(initialState, (currentUIModel, result) -> {
if (result.isLoading())
currentUIModel = UIModel.loadingState(bundle);
else if (result.isSuccessful())
currentUIModel = UIModel.successState(result);
else currentUIModel = UIModel.errorState(result.getError());
return currentUIModel;
})
.observeOn(AndroidSchedulers.mainThread()))
.subscribe(o -> {/* update UI */}, OnErrorNotImplementedException::new);
合并事件转换器:
public Observable.Transformer<BaseEvent, BaseEvent> mergeEvents(Class... classes) {
return events -> events.subscribeOn(Schedulers.io())
.publish(shared -> {
List<Class> classList = Arrays.asList(classes);
for (int i = 0, size = classList.size(); i < size; i++)
shared = shared.mergeWith(shared.ofType(classList.get(i)));
return shared;
});
}
堆栈跟踪:
java.lang.IllegalStateException: Exception thrown on Scheduler.Worker thread. Add `onError` handling.
at rx.android.schedulers.LooperScheduler$ScheduledAction.run(LooperScheduler.java:112)
at android.os.Handler.handleCallback(Handler.java:746)
at android.os.Handler.dispatchMessage(Handler.java:95)
at android.os.Looper.loop(Looper.java:148)
at android.app.ActivityThread.main(ActivityThread.java:5443)
at java.lang.reflect.Method.invoke(Native Method)
at com.android.internal.os.ZygoteInit$MethodAndArgsCaller.run(ZygoteInit.java:728)
at com.android.internal.os.ZygoteInit.main(ZygoteInit.java:618)
Caused by: rx.exceptions.OnErrorNotImplementedException: Expected to be called on the main thread but was RxIoScheduler-2
at com.zeyad.usecases.app.components.mvvm.BaseSubscriber.onError(BaseSubscriber.java:36)
at rx.observers.SafeSubscriber._onError(SafeSubscriber.java:153)
at rx.observers.SafeSubscriber.onError(SafeSubscriber.java:115)
at rx.observers.SerializedObserver.onError(SerializedObserver.java:152)
at rx.observers.SerializedSubscriber.onError(SerializedSubscriber.java:78)
at rx.internal.operators.OperatorTakeUntil$1.onError(OperatorTakeUntil.java:50)
at rx.internal.operators.OperatorObserveOn$ObserveOnSubscriber.checkTerminated(OperatorObserveOn.java:273)
at rx.internal.operators.OperatorObserveOn$ObserveOnSubscriber.call(OperatorObserveOn.java:216)
at rx.android.schedulers.LooperScheduler$ScheduledAction.run(LooperScheduler.java:107)
at android.os.Handler.handleCallback(Handler.java:746)
at android.os.Handler.dispatchMessage(Handler.java:95)
at android.os.Looper.loop(Looper.java:148)
at android.app.ActivityThread.main(ActivityThread.java:5443)
at java.lang.reflect.Method.invoke(Native Method)
at com.android.internal.os.ZygoteInit$MethodAndArgsCaller.run(ZygoteInit.java:728)
at com.android.internal.os.ZygoteInit.main(ZygoteInit.java:618)
Caused by: java.lang.IllegalStateException: Expected to be called on the main thread but was RxIoScheduler-2
at rx.android.MainThreadSubscription.verifyMainThread(MainThreadSubscription.java:58)
at com.jakewharton.rxbinding.support.v7.widget.RecyclerViewScrollEventOnSubscribe.call(RecyclerViewScrollEventOnSubscribe.java:19)
at com.jakewharton.rxbinding.support.v7.widget.RecyclerViewScrollEventOnSubscribe.call(RecyclerViewScrollEventOnSubscribe.java:10)
at rx.Observable.unsafeSubscribe(Observable.java:10346)
at rx.internal.operators.OnSubscribeMap.call(OnSubscribeMap.java:48)
at rx.internal.operators.OnSubscribeMap.call(OnSubscribeMap.java:33)
at rx.Observable.unsafeSubscribe(Observable.java:10346)
at rx.internal.operators.OnSubscribeFilter.call(OnSubscribeFilter.java:45)
at rx.internal.operators.OnSubscribeFilter.call(OnSubscribeFilter.java:30)
at rx.internal.operators.OnSubscribeLift.call(OnSubscribeLift.java:48)
at rx.internal.operators.OnSubscribeLift.call(OnSubscribeLift.java:30)
at rx.internal.operators.OnSubscribeLift.call(OnSubscribeLift.java:48)
at rx.internal.operators.OnSubscribeLift.call(OnSubscribeLift.java:30)
at rx.internal.operators.OnSubscribeLift.call(OnSubscribeLift.java:48)
at rx.internal.operators.OnSubscribeLift.call(OnSubscribeLift.java:30)
at rx.Observable.unsafeSubscribe(Observable.java:10346)
at rx.internal.operators.OnSubscribeDoOnEach.call(OnSubscribeDoOnEach.java:41)
at rx.internal.operators.OnSubscribeDoOnEach.call(OnSubscribeDoOnEach.java:30)
at rx.Observable.unsafeSubscribe(Observable.java:10346)
at rx.internal.operators.OnSubscribeDefer.call(OnSubscribeDefer.java:51)
at rx.internal.operators.OnSubscribeDefer.call(OnSubscribeDefer.java:35)
at rx.Observable.unsafeSubscribe(Observable.java:10346)
at rx.internal.operators.OperatorMerge$MergeSubscriber.onNext(OperatorMerge.java:248)
at rx.internal.operators.OperatorMerge$MergeSubscriber.onNext(OperatorMerge.java:148)
at rx.internal.operators.OnSubscribeFromArray$FromArrayProducer.fastPath(OnSubscribeFromArray.java:76)
at rx.internal.operators.OnSubscribeFromArray$FromArrayProducer.request(OnSubscribeFromArray.java:58)
at rx.Subscriber.setProducer(Subscriber.java:211)
at rx.internal.operators.OnSubscribeFromArray.call(OnSubscribeFromArray.java:32)
at rx.internal.operators.OnSubscribeFromArray.call(OnSubscribeFromArray.java:24)
at rx.internal.operators.OnSubscribeLift.call(OnSubscribeLift.java:48)
at rx.internal.operators.OnSubscribeLift.call(OnSubscr