使用 kotlin,有代码
fun fetchRemoteDataApi(): Single<RemoteDataResponse> = networkApi.getData()
// it is just a retrofit
@GET(".../api/getData")
fun getData() : Single<RemoteDataResponse>
fun mergeApiWithDb(): Completable = fetchRemoteDataApi()
.zipWith(localDao.getAll())
.flatMapCompletable { (remoteData, localData) ->
doMerge(remoteData, localData) //<== return a Completable
}
代码流程:
val mergeApiDbCall = mergeApiWithDb().onErrorComplete().cache() //<=== would like do some inspection at this level
PublishSubject.create<Unit>().toFlowable(BackpressureStrategy.LATEST)
.compose(Transformers.flowableIO())
.switchMap {
//merge DB with api, or local default value first then listen to DB change
mergeApiDbCall.andThen(listAllTopics())
.concatMapSingle { topics -> remoteTopicUsers.map { topics to it } }
}
.flatMapCompletable { (topics, user) ->
// do something return Completable
}
.subscribe({
...
}, { throwable ->
...
})
以及打电话的时候
val mergeApiDbCall = mergeApiWithDb().onErrorComplete().cache()
问题是是否要检查Singles<RemoteDataResponse>
返回的fetchRemoteDataApi()
(即使用 Log.i(...) 打印输出的内容RemoteDataResponse
等),无论是在出错还是成功的情况下,怎么做?
/// the functions
fun listAllTopics(): Flowable<List<String>> = localRepoDao.getAllTopics()
// which a DAO:
@Query("SELECT topic FROM RemoteDataTable WHERE read = 1")
fun getAllTopics(): Flowable<List<String>>
///
private val remoteTopicUsers: Single<List<User>>
get() {
return Single.create {
networkApi.getTopicUsers(object : ICallback.IGetTopicUsersCallback {
override fun onSuccess(result: List<User>) = it.onSuccess(result)
override fun onError(errorCode: Int, errorMsg: String?) = it.onError(Exception(errorCode, errorMsg))
})
}
}