我有一个链,我在其中执行一些阻塞 IO 调用(例如 HTTP 调用)。我希望阻塞调用消耗一个值,继续进行而不中断,但同时丢弃所有堆积的东西,然后以相同的方式消耗下一个值。
考虑以下示例:
fun main() {
Flowable.interval(100, TimeUnit.MILLISECONDS).onBackpressureLatest().map {
Thread.sleep(1000)
it
}.blockingForEach { println(it) }
}
从幼稚的角度来看,我希望它会打印出类似的东西0, 10, 20, ...
,但它会打印出0, 1, 2, ...
.
我究竟做错了什么?
编辑:
我想过天真地添加debounce
来吃掉传入的流:
fun main() {
Flowable.interval(100, TimeUnit.MILLISECONDS)
.debounce(0, TimeUnit.MILLISECONDS)
.map {
Thread.sleep(1000)
it
}
.blockingForEach { println(it) }
}
但是,现在我得到一个java.lang.InterruptedException: sleep interrupted
.
编辑:
似乎有效的方法如下:
fun main() {
Flowable.interval(100, TimeUnit.MILLISECONDS)
.throttleLast(0, TimeUnit.MILLISECONDS)
.map {
Thread.sleep(1000)
it
}
.blockingForEach { println(it) }
}
输出符合预期0, 10, 20, ...
!!
这是正确的方法吗?
我注意到这throttleLast
将切换到计算调度程序。有没有办法回到原来的调度程序?
编辑:
我也偶尔会遇到java.lang.InterruptedException: sleep interrupted
这种变体。