我正在尝试将改造 2 与 RxJava 和 Realm 结合起来,方法是将改造 observable 返回的服务调用的响应保存到使用 realm 的本地数据库中。所以我得到一个异常,说从不正确的线程访问领域。这是我的代码:
路径 1:
restApi.userEntityList()
.map(userEntityDataMapper::transformAllToRealm)
.doOnNext(userRealmModels -> {
if (userRealmModels != null){
mRealm = Realm.getInstance(mContext);
mRealm.asObservable()
.map(realm -> mRealm.copyToRealmOrUpdate(userEntity))
.subscribe(new Subscriber<Object>() {
@Override
public void onCompleted() {
}
@Override
public void onError(Throwable e) {
e.printStackTrace();
}
@Override
public void onNext(Object o) {
Log.d("RealmManager", "user added!");
}
});
}})
.map(userEntityDataMapper::transformAll)
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new Subscriber<List<User>>() {
@Override
public void onCompleted() {
hideViewLoading();
}
@Override
public void onError(Throwable e) {
hideViewLoading();
showErrorMessage(new DefaultErrorBundle((Exception) e));
showViewRetry();
}
@Override
public void onNext(List<User> users) {
showUsersCollectionInView(users);
}
});
路线2:
restApi.userEntityList()
.map(userEntityDataMapper::transformAllToRealm)
.doOnNext(userRealmModels -> {
if (userRealmModels != null) {
mRealm = Realm.getInstance(mContext);
mRealm.beginTransaction();
mRealm.copyToRealmOrUpdate(userEntity);
mRealm.commitTransaction();
}
})
.map(userEntityDataMapper::transformAll)
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new Subscriber<List<User>>() {
@Override
public void onCompleted() {
hideViewLoading();
}
@Override
public void onError(Throwable e) {
hideViewLoading();
showErrorMessage(new DefaultErrorBundle((Exception) e));
showViewRetry();
}
@Override
public void onNext(List<User> users) {
showUsersCollectionInView(users);
}
});
日志猫:
W/System.err: java.lang.IllegalStateException: Realm access from incorrect thread. Realm objects can only be accessed on the thread they were created.
at io.realm.BaseRealm.checkIfValid(BaseRealm.java:349)
at io.realm.BaseRealm.commitTransaction(BaseRealm.java:291)
at io.realm.Realm.commitTransaction(Realm.java:108)
at com.zeyad.cleanarchitecturet.data.db.RealmManagerImpl.put(RealmManagerImpl.java:66)
at com.zeyad.cleanarchitecturet.data.db.RealmManagerImpl.putAll(RealmManagerImpl.java:91)
at com.zeyad.cleanarchitecturet.data.repository.datasource.CloudUserDataStore$2.call(CloudUserDataStore.java:36)
at com.zeyad.cleanarchitecturet.data.repository.datasource.CloudUserDataStore$2.call(CloudUserDataStore.java:32)
at rx.Observable$11.onNext(Observable.java:4445)
at rx.internal.operators.OperatorDoOnEach$1.onNext(OperatorDoOnEach.java:80)
at rx.internal.operators.OperatorMap$1.onNext(OperatorMap.java:54)
at rx.internal.operators.OperatorMerge$MergeSubscriber.emitScalar(OperatorMerge.java:477)
at rx.internal.operators.OperatorMerge$MergeSubscriber.tryEmit(OperatorMerge.java:435)
at rx.internal.operators.OperatorMerge$MergeSubscriber.onNext(OperatorMerge.java:228)
at rx.internal.operators.OperatorMerge$MergeSubscriber.onNext(OperatorMerge.java:142)
at rx.internal.operators.OperatorMap$1.onNext(OperatorMap.java:54)
at retrofit.RxJavaCallAdapterFactory$CallOnSubscribe.call(RxJavaCallAdapterFactory.java:113)
at retrofit.RxJavaCallAdapterFactory$CallOnSubscribe.call(RxJavaCallAdapterFactory.java:88)
at rx.Observable$2.call(Observable.java:162)
at rx.Observable$2.call(Observable.java:154)
at rx.Observable$2.call(Observable.java:162)
at rx.Observable$2.call(Observable.java:154)
at rx.Observable$2.call(Observable.java:162)
at rx.Observable$2.call(Observable.java:154)
at rx.Observable$2.call(Observable.java:162)
at rx.Observable$2.call(Observable.java:154)
at rx.Observable$2.call(Observable.java:162)
at rx.Observable$2.call(Observable.java:154)
at rx.Observable$2.call(Observable.java:162)
at rx.Observable$2.call(Observable.java:154)
at rx.Observable$2.call(Observable.java:162)
at rx.Observable$2.call(Observable.java:154)
at rx.Observable.unsafeSubscribe(Observable.java:8098)
at rx.internal.operators.OperatorSubscribeOn$1$1.call(OperatorSubscribeOn.java:62)
at rx.internal.schedulers.ScheduledAction.run(ScheduledAction.java:55)
at rx.schedulers.ExecutorScheduler$ExecutorSchedulerWorker.run(ExecutorScheduler.java:98)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1113)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:588)
at java.lang.Thread.run(Thread.java:818)
Caused by: rx.exceptions.OnErrorThrowable$OnNextValue: OnError while emitting onNext value: java.util.ArrayList.class
at rx.exceptions.Exceptions.throwOrReport(Exceptions.java:187)
at rx.internal.operators.OperatorDoOnEach$1.onNext(OperatorDoOnEach.java:82)
... 29 more