1

I have an API call which verifies some status against an "Id". The API returns Single or error. I have a list of such Id's, Only one Id is valid to return success or none (all id's return error). What I need is, Iterate through each Id and skip the errors from API call, until either a success or end of the list. I am able to achieve this sequentially. However, I am trying to do the same, using ParallelFlowable. It works fine when an Id returns success, But when there is no id which returns success (all ids fail), then it just skip all the errors from API, but does not notify the subscriber after all the ids are validated. I am not sure how to handle this.

// API call
fun getStatus(Id: String): Single<String> {
  //... returns Single<String> or error
}

//Sequential flow, Working
fun getStatus(ids: List<String>): Single<String> {
  Observable.fromIterable(ids)
                .flatMapSingle { id ->
                    getStatus(id)
                        .onErrorResumeWith { singleSource ->
                            if (ids.last() == id)) { //If this is last item in list, return error
                                singleSource.onError(NoStatusFoundException())
                            } else {
                                // Skip errors until valid id is found or till the list reached end.
                                Flowable.empty<String>()
                            }
                        }
                }.firstOrError()
}

// Parallel Flow, How to identify the list is completed and return NoStatusFoundException in case of all id's fail?
fun getStatus(ids: List<String>): Single<String> { 
              Flowable.fromIterable(ids)
                .parallel()
                .runOn(io())
                .flatMap{ id -> getStatus(id).toFlowable()
                          .onErrorResumeWith { Flowable.empty<String>() }
                        }
                .sequentialDelayError()
                .firstOrError()
                .onErrorResumeNext { Single.error(it) }
}

// Subscription
getStatus(listOf("1","2","3","4","5",))
 .subscribeOn(Schedulers.io())
 .observeOn(AndroidSchedulers.mainThread())
 .subscriber({ id->
       // success
       this is notified when an id is success
    },
    { // error handler - Need help here
       Never notified when all the id's fail?
    })

4

3 回答 3

1

我可以通过onErrorResumeWith { Flowable.empty<String>() }在 flatMap 中删除并实现RxJavaPlugins.setErrorHandler{...}. sequentialDelayError()延迟所有的错误,直到所有的 rails 都完成了他们的任务。

fun getStatus(ids: List<String>): Single<String> { 
              Flowable.fromIterable(ids)
                .parallel()
                .runOn(io())
                .flatMap{ id -> getStatus(id).toFlowable()
                        }
                .sequentialDelayError()
                .firstOrError()
                .onErrorResumeNext { Single.error(it) }
}

///
RxJavaPlugins.setErrorHandler{ it: Throwable ->
  is UndeliverableException -> {...}
  .....
}
于 2020-05-13T01:46:46.370 回答
0

您正在返回 Flowable.empty() 立即完成订阅。取自文档:

返回一个不向 {@link Subscriber} 发出任何项目并立即调用其 {@link Subscriber#onComplete onComplete} 方法的 Flowable。

如果出现错误,也许您可​​以返回Flowable.just("")或提供一些预期的参数。

.onErrorResumeWith { Flowable.just("") }
于 2020-05-07T08:49:36.370 回答
0

问题出在这一行:

.onErrorResumeWith { Flowable.empty<String>() }

的参数onErrorResumeWith是 a Publisher<out T>,不是() -> Publisher<out T>。该Publisher接口恰好有一个方法,void subscribe​(Subscriber<? super T> s). 因此,它有资格进行 SAM 转换。

lambda{ Flowable.empty<String>() }是完全有效(Subscriber<String>) -> Unit的,它忽略其单个参数、调用方法并忽略结果。这可以编译,但结果对于所有实际目的都与Flowable.never().

而不是 lambda,您需要Flowable.empty()直接传递到onErrorResumeNext()

            .flatMap{ id -> getStatus(id).toFlowable()
                      .onErrorResumeWith(Flowable.empty<String>())
                    }
于 2020-05-08T22:03:25.810 回答