I have an API call which verifies some status against an "Id". The API returns Single or error. I have a list of such Id's, Only one Id is valid to return success or none (all id's return error). What I need is, Iterate through each Id and skip the errors from API call, until either a success or end of the list. I am able to achieve this sequentially. However, I am trying to do the same, using ParallelFlowable. It works fine when an Id returns success, But when there is no id which returns success (all ids fail), then it just skip all the errors from API, but does not notify the subscriber after all the ids are validated. I am not sure how to handle this.
// API call
fun getStatus(Id: String): Single<String> {
//... returns Single<String> or error
}
//Sequential flow, Working
fun getStatus(ids: List<String>): Single<String> {
Observable.fromIterable(ids)
.flatMapSingle { id ->
getStatus(id)
.onErrorResumeWith { singleSource ->
if (ids.last() == id)) { //If this is last item in list, return error
singleSource.onError(NoStatusFoundException())
} else {
// Skip errors until valid id is found or till the list reached end.
Flowable.empty<String>()
}
}
}.firstOrError()
}
// Parallel Flow, How to identify the list is completed and return NoStatusFoundException in case of all id's fail?
fun getStatus(ids: List<String>): Single<String> {
Flowable.fromIterable(ids)
.parallel()
.runOn(io())
.flatMap{ id -> getStatus(id).toFlowable()
.onErrorResumeWith { Flowable.empty<String>() }
}
.sequentialDelayError()
.firstOrError()
.onErrorResumeNext { Single.error(it) }
}
// Subscription
getStatus(listOf("1","2","3","4","5",))
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscriber({ id->
// success
this is notified when an id is success
},
{ // error handler - Need help here
Never notified when all the id's fail?
})