0

我有一个链,我在其中执行一些阻塞 IO 调用(例如 HTTP 调用)。我希望阻塞调用消耗一个值,继续进行而不中断,但同时丢弃所有堆积的东西,然后以相同的方式消耗下一个值。

考虑以下示例:

fun main() {
  Flowable.interval(100, TimeUnit.MILLISECONDS).onBackpressureLatest().map {
    Thread.sleep(1000)
    it
  }.blockingForEach { println(it) }
}

从幼稚的角度来看,我希望它会打印出类似的东西0, 10, 20, ...,但它会打印出0, 1, 2, ....

我究竟做错了什么?

编辑:

我想过天真地添加debounce来吃掉传入的流:

fun main() {
  Flowable.interval(100, TimeUnit.MILLISECONDS)
    .debounce(0, TimeUnit.MILLISECONDS)
    .map {
      Thread.sleep(1000)
      it
    }
    .blockingForEach { println(it) }
}

但是,现在我得到一个java.lang.InterruptedException: sleep interrupted.

编辑:

似乎有效的方法如下:

fun main() {
  Flowable.interval(100, TimeUnit.MILLISECONDS)
    .throttleLast(0, TimeUnit.MILLISECONDS)
    .map {
      Thread.sleep(1000)
      it
    }
    .blockingForEach { println(it) }
}

输出符合预期0, 10, 20, ...!!

这是正确的方法吗?

我注意到这throttleLast将切换到计算调度程序。有没有办法回到原来的调度程序?

编辑:

我也偶尔会遇到java.lang.InterruptedException: sleep interrupted这种变体。

4

1 回答 1

0

解决问题的最简单方法是:

fun <T> Flowable<T>.lossy() : Flowable<T> {
  return onBackpressureLatest().observeOn(Schedulers.io(), false, 1)
}

通过调用lossyaFlowable它开始删除所有进来的元素,这些元素比下游消费者可以处理的速度快。

于 2019-04-10T11:25:08.093 回答