0

在我的 Android 应用程序中,我使用的是域级别的存储库接口,该接口由使用 SqlBrite 实现的本地数据库和带有 Retrofit 可观察对象的网络 api 支持。所以我getDomains(): Observable<List<Domain>>在 Repository 中有方法,在我的 Retrofit 和 SqlBrite 中有两个相应的方法。我不想连接或合并,或将这两个 observables 放在一起。我希望我的存储库只从 SqlBrite 获取数据,并且由于 SqlBrite 返回 QueryObservable,它在onNext()每次底层数据更改时触发,我可以独立运行我的网络请求并将结果存储到 SqlBrite 并使用从网络获取并存储到数据库数据来更新我的 Observable . 所以我尝试实现我的存储库的getDomains()方法如下:

fun getDomains(): Observable<List<Domain>> {
    return db.getDomains()
                   .doOnSubscribe {
                       networkClient.getDomains()
                                    .doOnNext { db.putDomains(it) }
                                    .onErrorReturn{ emptyList() }
                                    .subscribe()
                   }
}

但是在这种情况下,每次客户端应该订阅,每次它都会发出网络请求,那就不太好了。我考虑过其他do...操作员将请求移到那里,但是doOnCompleted()如果 QueryObservable 永远不会被调用,直到我在toBlocking()某个地方调用,我不会这样做,doOnEach()也不好,因为它每次从数据库中提取项目时都会发出请求。我也尝试使用replay()运算符,但尽管在这种情况下缓存了 Observable,但订阅会发生并导致网络请求。那么,如何以期望的方式组合这两个 Observable 呢?

4

2 回答 2

1

好的,这取决于您拥有的具体用例:即假设您想显示本地数据库中的最新数据,并且不时通过在后台执行网络请求来更新数据库。

也许有更好的方法,但也许你可以做这样的事情

 fun <T> createDataAwareObservable(databaseQuery: Observable<T>): Observable<T> =
      stateDeterminer.getState().flatMap {
        when (it) {
          State.UP_TO_DATE -> databaseQuery // Nothing to do, data is up to date so observable can be returned directly

          State.NO_DATA ->
            networkClient.getDomains() // no data so first do the network call
                .flatMap { db.save(it) } // save network call result in database
                .flatMap { databaseQuery } // continue with original observable

          State.SYNC_IN_BACKGROUND -> {
            // Execute sync in background
            networkClient.getDomains()
                .flatMap { db.save(it) }
                .observeOn(backgroundSyncScheduler)
                .subscribeOn(backgroundSyncScheduler)
                .subscribe({}, { Timber.e(it, "Error when starting background sync") }, {})

            // Continue with original observable in parallel, network call will then update database and thanks to sqlbrite databaseQuery will be update automatically
            databaseQuery
          }
        }
      }

所以最后你创建了你的 SQLBrite Observable (QueryObservable) 并将它传递给createDataAwareObservable()函数。如果这里没有数据,它将确保从网络加载数据,否则它将检查数据是否应该在后台更新(将其保存到数据库中,然后将自动更新 SQLBrite QueryObservable)或者数据是否最新。

基本上你可以像这样使用它:

createDataAwareObservable( db.getAllDomains() ).subscribe(...)

因此,对于您作为此用户的您,createDataAwareObservable()您始终会Observable<T>在作为参数传入时获得相同的类型。所以从本质上讲,您似乎一直在订阅db.getAllDomains()...

于 2016-06-27T07:48:51.757 回答
0

如果您的问题是每次想要获取数据时都必须订阅观察者,您可以使用中继,它永远不会取消订阅观察者,因为没有实现 onComplete

   /**
 * Relay is just an observable which subscribe an observer, but it wont unsubscribe once emit the items. So the pipeline keep open
 * It should return 1,2,3,4,5 for first observer and just 3, 4, 5 fot the second observer since default relay emit last emitted item,
 * and all the next items passed to the pipeline.
 */
@Test
public void testRelay() throws InterruptedException {
    BehaviorRelay<String> relay = BehaviorRelay.create("default");
    relay.subscribe(result -> System.out.println("Observer1:" + result));
    relay.call("1");
    relay.call("2");
    relay.call("3");
    relay.subscribe(result -> System.out.println("Observer2:" + result));
    relay.call("4");
    relay.call("5");
}

这里的另一个例子https://github.com/politrons/reactive/blob/master/src/test/java/rx/relay/Relay.java

于 2016-06-26T18:05:32.207 回答