0

我需要创建一个自定义 Flowable 并实现背压。我正在尝试实现某种分页。这意味着当下游请求 5 个项目时,我将“向数据源询问”项目 0 - 5。然后当下游需要另外 5 个项目时,我将获取项目 5 - 10 并发回。

到目前为止我发现的最好的事情是使用Flowable.generate方法,但我真的不明白为什么没有办法(据我所知)如何获取requested下游请求的项目数量。我可以使用state生成器的属性来保存最后请求的项目的索引,所以我只需要新请求的项目的数量。我在 BiFunctionapply中得到的发射器实例GeneratorSubscription是从AtomicLong. 所以投射 emmiterAtomicLong可以得到我请求的号码。但我知道这不是“推荐”的方式。

另一方面,当您使用时,Flowable.create您会得到具有long requested()方法的 FlowableEmitter。usinggenerate更适合我的用例,但现在我也很好奇什么是“正确”的使用方式Flowable.generate

也许我想太多了,所以请指出我正确的方向。:) 谢谢。

这是实际代码的样子(在 Kotlin 中):

Flowable.generate(Callable { 0 }, BiFunction { start /*state*/, emitter ->
        val requested = (emitter as AtomicLong).get().toInt()  //this is bull*hit
        val end = start + requested
        //get items [start to end] -> items
        emmiter.onNext(items)
        end /*return the new state*/
    })
4

1 回答 1

0

好的,我发现applyBiFunction 的函数被调用了很多次,请求量(n)也是如此。因此,没有理由为此使用吸气剂。这不是我所希望的,但它显然是如何generate工作的。:)

于 2017-11-09T14:50:10.073 回答