1

如何实现flatMapFirst类似的运算符flatMap,但只有在前一个结束时才添加新的 observable?如果前一个仍在运行,它只会忽略新的 observable。如何在 RxJava 2 中实现它?

它已经存在于 bacon - flatMapFirst 和 kefir - flatMapFirst中。

在此处输入图像描述

4

3 回答 3

4

您不需要新的运算符,而是现有运算符的组合:

source.onBackpressureLatest().flatMap(function, 1)

FlatMap 将一次运行 1 个内部源,如果在 flatMap 运行 1 个内部源时没有需求,onBackpressureLatest 将继续丢弃外部源值(最新的除外)。

如果您不想继续使用最新的源代码,请考虑onBackpressureDrop改用。

于 2017-01-24T13:03:18.360 回答
0

大卫回答的后续行动。如果您正在寻找一种flatMapFirst在 RxJava 2 中使用的方法Observable,而不是Flowable,这里是一个快速的 Kotlin 实现:

fun <T, R> Observable<T>.flatMapFirst(transform: (T) -> Observable<R>) =
    toFlowable(BackpressureStrategy.DROP)
        .flatMap({ transform(it).toFlowable(BackpressureStrategy.BUFFER) }, 1)
        .toObservable()

UPD。基于 David Karnok 的建议的替代实现:

fun <T, R> Observable<T>.flatMapFirst(transform: (T) -> Observable<R>) =
    Observable.defer {
        val busy = AtomicBoolean()
        return@defer this
                .filter { busy.compareAndSet(false, true) }
                .flatMap {
                    transform(it).doAfterTerminate { busy.set(false) }
                }
    }
于 2018-04-13T20:21:54.857 回答
0

我设法解决了这个问题:

/**
 * Flatmaps upstream items into [source] items.
 * Ignores upstream items if there is any [source] instance currently running.
 *
 * ```
 * upstream ----u-----u---u-------u---------------|-->
 *              ↓                 ↓               ↓
 * source       ---s-------|->    ---s-------|->  ↓
 *                 ↓                 ↓            ↓
 * result   -------s-----------------s------------|-->
 * ```
 */
fun <T, R> Observable<T>.flatMapWithDrop(source: Observable<R>): Observable<R> {
  return this.toFlowable(BackpressureStrategy.DROP)
    .flatMap({ source.toFlowable(BackpressureStrategy.MISSING) }, 1)
    .toObservable()
}
于 2018-04-13T20:48:08.037 回答