如何实现flatMapFirst
类似的运算符flatMap
,但只有在前一个结束时才添加新的 observable?如果前一个仍在运行,它只会忽略新的 observable。如何在 RxJava 2 中实现它?
它已经存在于 bacon - flatMapFirst 和 kefir - flatMapFirst中。
如何实现flatMapFirst
类似的运算符flatMap
,但只有在前一个结束时才添加新的 observable?如果前一个仍在运行,它只会忽略新的 observable。如何在 RxJava 2 中实现它?
它已经存在于 bacon - flatMapFirst 和 kefir - flatMapFirst中。
您不需要新的运算符,而是现有运算符的组合:
source.onBackpressureLatest().flatMap(function, 1)
FlatMap 将一次运行 1 个内部源,如果在 flatMap 运行 1 个内部源时没有需求,onBackpressureLatest 将继续丢弃外部源值(最新的除外)。
如果您不想继续使用最新的源代码,请考虑onBackpressureDrop
改用。
大卫回答的后续行动。如果您正在寻找一种flatMapFirst
在 RxJava 2 中使用的方法Observable
,而不是Flowable
,这里是一个快速的 Kotlin 实现:
fun <T, R> Observable<T>.flatMapFirst(transform: (T) -> Observable<R>) =
toFlowable(BackpressureStrategy.DROP)
.flatMap({ transform(it).toFlowable(BackpressureStrategy.BUFFER) }, 1)
.toObservable()
UPD。基于 David Karnok 的建议的替代实现:
fun <T, R> Observable<T>.flatMapFirst(transform: (T) -> Observable<R>) =
Observable.defer {
val busy = AtomicBoolean()
return@defer this
.filter { busy.compareAndSet(false, true) }
.flatMap {
transform(it).doAfterTerminate { busy.set(false) }
}
}
我设法解决了这个问题:
/**
* Flatmaps upstream items into [source] items.
* Ignores upstream items if there is any [source] instance currently running.
*
* ```
* upstream ----u-----u---u-------u---------------|-->
* ↓ ↓ ↓
* source ---s-------|-> ---s-------|-> ↓
* ↓ ↓ ↓
* result -------s-----------------s------------|-->
* ```
*/
fun <T, R> Observable<T>.flatMapWithDrop(source: Observable<R>): Observable<R> {
return this.toFlowable(BackpressureStrategy.DROP)
.flatMap({ source.toFlowable(BackpressureStrategy.MISSING) }, 1)
.toObservable()
}