0

如何在 RxJava2/RxKotlin2 中实现Observable.concatEagerDelayError或等效项?

有 :

  • Observable.concatEager
  • Observable.concatDelayError

但不是 :

  • Observable.concatEagerDelayError

是)我有的 :

fun getAll(): Observable<List<User>> = Observable.concatArrayDelayError(
    // from db
    userDAO
        .selectAll()
        .subscribeOn(ioScheduler),
    // from api
    userAPI
        .getAll()
        .doOnNext { lstUser -> Completable.concatArray(
            userDAO.deleteAll().subscribeOn(ioScheduler),
            userDAO.save(lstUser).subscribeOn(ioScheduler)
        ) }
        .subscribeOn(ioScheduler)
)

我想要相同的行为,但渴望 selectAll() 和 getAll() 因为没有理由等待从 db 启动网络调用。

4

1 回答 1

1

使用concatMapEagerDelayError

 Observable.fromIterable(sources)
 .concatMapEagerDelayError(v -> v, true);

 Observable.fromArray(source1, source2, source3)
 .concatMapEagerDelayError(v -> v, true);

Java 文档

编辑:

fun getAll(): Observable<List<User>> = Observable.fromArray(
    // from db
    userDAO
        .selectAll()
        .subscribeOn(ioScheduler),
    // from api
    userAPI
       .getAll()
       // --- this makes no sense by the way -------------------
       .doOnNext { lstUser -> Completable.concatArray(
            userDAO.deleteAll().subscribeOn(ioScheduler),
            userDAO.save(lstUser).subscribeOn(ioScheduler)
       )}
       // ------------------------------------------------------
       .subscribeOn(ioScheduler)
)
.concatMapEagerDelayError({ v -> v }, true)
于 2019-03-14T12:50:29.817 回答