我需要创建一个自定义 Flowable 并实现背压。我正在尝试实现某种分页。这意味着当下游请求 5 个项目时,我将“向数据源询问”项目 0 - 5。然后当下游需要另外 5 个项目时,我将获取项目 5 - 10 并发回。
到目前为止我发现的最好的事情是使用Flowable.generate
方法,但我真的不明白为什么没有办法(据我所知)如何获取requested
下游请求的项目数量。我可以使用state
生成器的属性来保存最后请求的项目的索引,所以我只需要新请求的项目的数量。我在 BiFunctionapply
中得到的发射器实例GeneratorSubscription
是从AtomicLong
. 所以投射 emmiterAtomicLong
可以得到我请求的号码。但我知道这不是“推荐”的方式。
另一方面,当您使用时,Flowable.create
您会得到具有long requested()
方法的 FlowableEmitter。usinggenerate
更适合我的用例,但现在我也很好奇什么是“正确”的使用方式Flowable.generate
。
也许我想太多了,所以请指出我正确的方向。:) 谢谢。
这是实际代码的样子(在 Kotlin 中):
Flowable.generate(Callable { 0 }, BiFunction { start /*state*/, emitter ->
val requested = (emitter as AtomicLong).get().toInt() //this is bull*hit
val end = start + requested
//get items [start to end] -> items
emmiter.onNext(items)
end /*return the new state*/
})