6

I'm implementing network API with the combination of RxJava and Retrofit, and I use Realm as my database. I got it pretty much working but I'm wondering if it is the correct approach and flow of events. So, here is the RetrofitApiManager.

public class RetrofitApiManager {

    private static final String BASE_URL = "***";

    private final ShopApi shopApi;

    public RetrofitApiManager(OkHttpClient okHttpClient) {

        // GSON INITIALIZATION

        Retrofit retrofit = new Retrofit.Builder()
                .client(okHttpClient)
                .addCallAdapterFactory(RxJavaCallAdapterFactory.create())
                .addConverterFactory(GsonConverterFactory.create(gson))
                .baseUrl(BASE_URL)
                .build();

        shopApi = retrofit.create(ShopApi.class);
    }

    public Observable<RealmResults<Shop>> getShops() {
        return shopApi.getShops()
                .subscribeOn(Schedulers.io())
                .observeOn(AndroidSchedulers.mainThread())
                .doOnNext(response -> {
                    Realm realm = Realm.getDefaultInstance();
                    realm.executeTransaction(realm1 -> 
                            realm1.copyToRealmOrUpdate(response.shops));
                    realm.close();
                })
                .flatMap(response -> {
                    Realm realm = Realm.getDefaultInstance();
                    Observable<RealmResults<Shop>> results = realm.where(Shop.class)
                            .findAllAsync()
                            .asObservable()
                            .filter(RealmResults::isLoaded);
                    realm.close();
                    return results;
                });
    }
}

And here is the call to get RealmResults<Shop> inside a Fragment.

realm.where(Shop.class)
        .findAllAsync()
        .asObservable()
        .filter(RealmResults::isLoaded)
        .first()
        .flatMap(shops -> 
                shops.isEmpty() ? retrofitApiManager.getShops() : Observable.just(shops))
        .subscribe(
                shops -> initRecyclerView(),
                throwable -> processError(throwable));

Here are my questions:

  1. Is it a correct approach to chain events like in the example above or should I manage them in a different way?

  2. Is it OK to useRealm instance in getShops() method and close i there or would it be better to pass it as an argument and then manage it somehow? Although, this idea seems to be a bit problematic with threads and calling Realm.close() always at the right time.

4

3 回答 3

11

1)我会尽可能多地在后台线程上做,现在你在 UI 线程上做了很多工作。

2)

  public Observable<RealmResults<Shop>> getShops() {
        return shopApi.getShops()
                .subscribeOn(Schedulers.io())
                .observeOn(AndroidSchedulers.mainThread())
                .doOnNext(response -> {
                    try(Realm realm = Realm.getDefaultInstance()) {
                        realm.executeTransaction(realm1 -> 
                            realm1.insertOrUpdate(response.shops));
                    } // auto-close
                })
                .flatMap(response -> {
                    try(Realm realm = Realm.getDefaultInstance()) {
                        Observable<RealmResults<Shop>> results = realm.where(Shop.class)
                            .findAllAsync()
                            .asObservable()
                            .filter(RealmResults::isLoaded);
                    } // auto-close
                    return results;
                });
    }

所有 Realm 数据都是延迟加载的,因此它仅在 Realm 实例打开时可用,因此在检索后关闭它很有可能无法正常工作。在您的情况下,虽然您在主线程上进行平面映射,但很可能那里已经有一个打开的实例。

如果您愿意,您可以使用copyFromRealm()获取可以跨线程移动且不再连接到 Realm 的非托管数据,但它们也会失去实时更新功能并占用更多内存。

它可能会这样做:

  public Observable<RealmResults<Shop>> getShops() {
        return shopApi.getShops()
                .subscribeOn(Schedulers.io())
                .doOnNext(response -> {
                    try(Realm realm = Realm.getDefaultInstance()) {
                        realm.executeTransaction(realm1 -> 
                            realm1.copyToRealmOrUpdate(response.shops));
                    } // auto-close
                })
                .observeOn(AndroidSchedulers.mainThread())
                .flatMap(response -> {
                    Observable<RealmResults<Shop>> results = realm.where(Shop.class)
                            .findAllAsync()
                            .asObservable()
                            .filter(RealmResults::isLoaded);
                    return results;
                });

或者,您可以将网络请求视为副作用,并仅依赖 Realm 在发生更改时通知您(更好的方法是 IMO,因为您将网络与 DB 访问分开,这就是存储库模式的意义所在)

public Observable<RealmResults<Shop>> getShops() {
    // Realm will automatically notify this observable whenever data is saved from the network
    return realm.where(Shop.class).findAllAsync().asObservable()
            .filter(RealmResults::isLoaded)
            .doOnNext(results -> {
                if (results.size() == 0) {
                    loadShopsFromNetwork();
                }
            }); 
}

private void loadShopsFromNetwork() {
    shopApi.getShops()
            .subscribeOn(Schedulers.io())
            .subscribe(response -> {
                try(Realm realm = Realm.getDefaultInstance()) {
                    realm.executeTransaction(r -> r.insertOrUpdate(response.shops));
                } // auto-close
            });
}
于 2016-06-28T07:19:21.233 回答
1

Christian Melchior 在他的回答中提到的内容非常有意义,并且应该可以解决您手头的问题,但这种方法可能会引入其他问题。

在一个好的架构中,所有主要模块(或库)都应该与其余代码隔离开来。由于 Realm、RealmObject 或 RealmResult 不能跨线程传递,因此将 Realm 和 Realm 相关操作与其余代码隔离开来更为重要。

对于每个 jsonModel 类,您应该有一个 realmModel 类和一个 DAO(数据访问对象)。这里的想法是,除了 DAO 类之外,没有一个类必须知道或访问 realmModel 或 Realm。DAO 类接受 jsonModel,转换为 realmModel,执行读/写/编辑/删除操作,对于读操作,DAO 将 realmModel 转换为 jsonModel 并返回。

这种方式很容易维护 Realm,避免所有 Thread 相关的问题,易于测试和调试。

这是一篇关于具有良好架构的 Realm 最佳实践的文章https://medium.com/@Viraj.Tank/realm-integration-in-android-best-practices-449919d25f2f

也是一个示例项目,演示了在 Android 上 Realm 与 MVP(模型视图演示器)、RxJava、Retrofit、Dagger、注释和测试的集成。https://github.com/viraj49/Realm_android-injection-rx-test

于 2016-07-01T11:45:50.533 回答
1

就我而言,我似乎已经为RealmRecyclerViewAdapter这样的查询定义了:

    recyclerView.setAdapter(new CatAdapter(getContext(),
            realm.where(Cat.class).findAllSortedAsync(CatFields.RANK, Sort.ASCENDING)));

并且在其他情况下定义了一个使用 RxJava 进行改造的条件,以便在满足条件时下载更多内容:

    Subscription downloadCats = Observable.create(new RecyclerViewScrollBottomOnSubscribe(recyclerView))
            .filter(isScrollEvent -> isScrollEvent || realm.where(Cat.class).count() <= 0)
            .switchMap(isScrollEvent -> catService.getCats().subscribeOn(Schedulers.io()))  // RETROFIT
            .retry()
            .subscribe(catsBO -> {
                try(Realm outRealm = Realm.getDefaultInstance()) {
                    outRealm.executeTransaction((realm) -> {
                        Cat defaultCat = new Cat();
                        long rank;
                        if(realm.where(Cat.class).count() > 0) {
                            rank = realm.where(Cat.class).max(Cat.Fields.RANK.getField()).longValue();
                        } else {
                            rank = 0;
                        }
                        for(CatBO catBO : catsBO.getCats()) {
                            defaultCat.setId(catBO.getId());
                            defaultCat.setRank(++rank);
                            defaultCat.setSourceUrl(catBO.getSourceUrl());
                            defaultCat.setUrl(catBO.getUrl());
                            realm.insertOrUpdate(defaultCat);
                        }
                    });
                }
            }, throwable -> {
                Log.e(TAG, "An error occurred", throwable);
            });

例如,这是基于编辑文本输入的搜索:

    Subscription filterDogs = RxTextView.textChanges(editText)
                     .switchMap((charSequence) -> 
                           realm.where(Dog.class)
                                .contains(DogFields.NAME, charSequence.toString())
                                .findAllAsyncSorted(DogFields.NAME, Sort.ASCENDING)
                                .asObservable())
                     .filter(RealmResults::isLoaded) 
                     .subscribe(dogs -> realmRecyclerAdapter.updateData(dogs));
于 2016-09-06T22:19:24.370 回答