20

我有一个房间持久数据库插入方法,如下所示:

@Dao
public interface CountriesDao{

    @Insert(onConflict = REPLACE)
    List<Long> addCountries(List<CountryModel> countryModel);
}

我意识到这不能在主线程上运行。这是我定义数据库的方式:

Room.inMemoryDatabaseBuilder(context.getApplicationContext(), MyDatabase.class).build();

我正在尝试使用 rxjava2,这样我就不会在主线程上运行。我创建了以下方法:

public void storeCountries(List<CountryModel> countriesList) {
        Observable.just(db.countriesDao().addCountries(countriesList))
                .subscribeOn(Schedulers.io())
                .observeOn(AndroidSchedulers.mainThread())
                .subscribe(new DefaultSubscriber<List<Long>>(){
            @Override
            public void onSubscribe(@NonNull Disposable d) {
                super.onSubscribe(d);
            }

            @Override
            public void onNext(@NonNull List<Long> longs) {
                super.onNext(longs);
                Timber.d("insert countries transaction complete");
            }

            @Override
            public void onError(@NonNull Throwable e) {
                super.onError(e);
                Timber.d("error storing countries in db"+e);
            }

            @Override
            public void onComplete() {
                Timber.d("insert countries transaction complete");
            }
        });
    }

对我来说,这显然现在正在另一个线程上运行。不是主线程,但是当我运行此代码时,出现以下错误:

完整的堆栈跟踪如下。为什么会这样?

进程:com.mobile.myapp.staging,PID:12990
java.lang.IllegalStateException:调度程序抛出致命异常。原因:java.lang.IllegalStateException:无法访问主线程上的数据库,因为它可能会长时间锁定 UI。在 io.reactivex.android.schedulers.HandlerScheduler$ScheduledRunnable.run(HandlerScheduler.java:111) 在 android.os.Handler.handleCallback(Handler.java:751) 在 android.os.Handler.dispatchMessage(Handler.java:95 ) 在 android.os.Looper.loop(Looper.java:

不重要,但如果您需要知道 defaultSubscriber 类在这里的样子,它是:

DefaultSubscriber.java

public class DefaultSubscriber<T> implements Observer<T> {

Disposable disposable;

@Override
public void onSubscribe(@NonNull Disposable d) {
    disposable = d;
}

@Override
public void onNext(@NonNull T t) {

}

@Override
public void onError(@NonNull Throwable e) {
    Timber.e(e);
}

@Override
public void onComplete() {

}

public void unsubscribe(){
    if(disposable!=null && !disposable.isDisposed()){
        disposable.dispose();
    }
  }
}
4

6 回答 6

33

这是一个常见的错误:just()不会执行括号内的“代码”,因为它just需要一个值,而不是计算。你需要fromCallable

Observable.fromCallable(() -> db.countriesDao().addCountries(countriesList))
于 2017-06-16T10:59:54.960 回答
17

更好的是,您可以使用Completable. 其描述:表示没有任何值但仅指示完成异常的计算。

Completable.fromAction(() -> db.countriesDao().addCountries(list)).subscribe();
于 2018-01-17T17:54:06.300 回答
12

注意:Room 不支持在主线程上访问数据库,除非您在构建器上调用了 allowMainThreadQueries(),因为它可能会长时间锁定 UI。异步查询 - 返回 LiveData 或 Flowable 实例的查询不受此规则的约束,因为它们在需要时在后台线程上异步运行查询。

所以你的代码可以是这样的

Completable.fromAction(() -> db.countriesDao()
                .addCountries(list))
                .subscribeOn(Schedulers.io())
                .subscribe();
于 2018-06-29T14:55:54.587 回答
5

从房间房间2.1.0-alpha02,您可以在插入时 使用 ( Completeable, Single, ) ( https://medium.com/androiddevelopers/room-rxjava-acb0cd4f3757 )Maybe

例子

@Dao
interface UserDao{
     @Insert
     Completable insert(final User user); // currently, we must put final before user variable or you will get error when compile
}

使用

db.userDao().insert(user).subscribeOn(Schedulers.io()).subscribe(new Action() {
    @Override
    public void run() throws Exception {
        // success
    }
}, new Consumer < Throwable > () {
    @Override
    public void accept(Throwable throwable) throws Exception {
        // error
    }
});
于 2018-11-19T06:30:32.340 回答
1

您还可以使用 Single observable

        Single.create(new SingleOnSubscribe<List<Long>>() {
        @Override
        public void subscribe(SingleEmitter<List<Long>> emitter) throws Exception {
            try {
                List<Long> ids = db.countriesDao().addCountries(countriesList);
                emitter.onSuccess(ids);
            } catch (Throwable t) {
                emitter.onError(t);
            }
        }})
    .observeOn(AndroidSchedulers.mainThread())
    .subscribeOn(Schedulers.io())
    .subscribeWith(new DisposableSingleObserver<Long>() {
        @Override
        public void onSuccess(List<Long> ids) {

        }

        @Override
        public void onError(Throwable e) {

        }
});
于 2017-12-06T19:04:21.097 回答
0

确保您已经添加了两个依赖项,“implementation 'io.reactivex.rxjava3:rxjava:3.0.0' implementation 'io.reactivex.rxjava3:rxandroid:3.0.0'”

于 2021-03-11T15:51:23.920 回答