调用 syncUsers() 方法时出现错误。
rx.exceptions.OnErrorNotImplementedException at rx.Observable$27.onError(Observable.java:8139)
at rx.observers.SafeSubscriber._onError(SafeSubscriber.java:157)
at rx.observers.SafeSubscriber.onError(SafeSubscriber.java:120)
at rx.internal.operators.OperatorZip$Zip$InnerSubscriber.onError(OperatorZip.java:325)
at rx.internal.operators.OperatorZip$Zip$InnerSubscriber.onNext(OperatorZip.java:333)
at io.realm.rx.RealmObservableFactory$5$1.onChange(RealmObservableFactory.java:143)
at io.realm.rx.RealmObservableFactory$5$1.onChange(RealmObservableFactory.java:139)
at io.realm.RealmResults.notifyChangeListeners(RealmResults.java:1010)
at io.realm.RealmResults.notifyChangeListeners(RealmResults.java:996)
at io.realm.HandlerController.notifyRealmResultsCallbacks(HandlerController.java:303)
at io.realm.HandlerController.notifySyncRealmResultsCallbacks(HandlerController.java:284)
at io.realm.HandlerController.notifyTypeBasedListeners(HandlerController.java:275)
at io.realm.HandlerController.notifyAllListeners(HandlerController.java:262)
at io.realm.HandlerController.realmChanged(HandlerController.java:385)
at io.realm.HandlerController.handleMessage(HandlerController.java:116)
at android.os.Handler.dispatchMessage(Handler.java:98)
at android.os.Looper.loop(Looper.java:148)
at android.app.ActivityThread.main(ActivityThread.java:5417)
at java.lang.reflect.Method.invoke(Native Method)
at com.android.internal.os.ZygoteInit$MethodAndArgsCaller.run(ZygoteInit.java:726)
at com.android.internal.os.ZygoteInit.main(ZygoteInit.java:616)
Caused by: rx.exceptions.MissingBackpressureException
at rx.internal.util.RxRingBuffer.onNext(RxRingBuffer.java:352)
at rx.internal.operators.OperatorZip$Zip$InnerSubscriber.onNext(OperatorZip.java:331)
at io.realm.rx.RealmObservableFactory$5$1.onChange(RealmObservableFactory.java:143)
at io.realm.rx.RealmObservableFactory$5$1.onChange(RealmObservableFactory.java:139)
at io.realm.RealmResults.notifyChangeListeners(RealmResults.java:1010)
at io.realm.RealmResults.notifyChangeListeners(RealmResults.java:996)
at io.realm.HandlerController.notifyRealmResultsCallbacks(HandlerController.java:303)
at io.realm.HandlerController.notifySyncRealmResultsCallbacks(HandlerController.java:284)
at io.realm.HandlerController.notifyTypeBasedListeners(HandlerController.java:275)
at io.realm.HandlerController.notifyAllListeners(HandlerController.java:262)
at io.realm.HandlerController.realmChanged(HandlerController.java:385)
at io.realm.HandlerController.handleMessage(HandlerController.java:116)
at android.os.Handler.dispatchMessage(Handler.java:98)
at android.os.Looper.loop(Looper.java:148)
at android.app.ActivityThread.main(ActivityThread.java:5417)
at java.lang.reflect.Method.invoke(Native Method)
at com.android.internal.os.ZygoteInit$MethodAndArgsCaller.run(ZygoteInit.java:726)
at com.android.internal.os.ZygoteInit.main(ZygoteInit.java:616)
问题似乎来自第一次操作。
public Observable<Void> syncUsers() {
return client.getInstance().getUsers()
.flatMap(new Func1<Users, Observable<Users>>() {
@Override
public Observable<Users> call(final Users users) {
users.setIdentifier(PrefHelper.getUserName()); //primary key
final Realm realm = Realm.getDefaultInstance();
realm.executeTransaction(new Realm.Transaction() {
@Override
public void execute(Realm realm) {
realm.copyToRealmOrUpdate(users);
}
});
realm.close();
return Observable.just(users);
}
})
.flatMap(new Func1<Users, Observable<Void>>() {
@Override
public Observable<Void> call(final Users users) {
if (users != null && users.getUsers() != null) {
for (final User user : users.getUsers()) {
Realm realm = Realm.getDefaultInstance();
realm.executeTransaction(new Realm.Transaction() {
@Override
public void execute(Realm realm) {
UserFactory.attachParentUser(user);
realm.copyToRealmOrUpdate(user);
}
});
realm.close();
}
}
return Observable.empty();
}
});
}
和电话:
mGetUsersSubscription = new GetUsersRequest(getApplicationContext(), mCurrentUser != null ? mCurrentUser.getIdentifier() : mUserIdToSearch, false).execute()
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new Subscriber<Void>() {
@Override
public void onCompleted() {
//Do nothing
}
@Override
public void onError(Throwable e) {
mFilesEmptyView.showError();
mSwipeRefreshLayout.setRefreshing(false);
}
@Override
public void onNext(Void aVoid) {
mSwipeRefreshLayout.setRefreshing(false);
refreshDataFromDB();
}
});