我一直在努力解决我认为是一个非常基本的问题。
我有一个Flowable
从网络中检索一组项目并发出它们的方法。
Flowable
.create(new FlowableOnSubscribe<Item>() {
@Override
public void subscribe(FlowableEmitter<Item> emitter) throws Exception {
Item item = retrieveNext();
while (item != null) {
emitter.onNext(item);
if (item.isLast) {
break;
}
item = retrieveNext();
}
emitter.onComplete();
}
}, BackpressureStrategy.BUFFER)
.doOnRequest(new LongConsumer() {
@Override
public void accept(long t) throws Exception {
// ...
}
});
我需要它在每个请求的基础上工作。也就是说,我保留一个Subscription
并subscription.request(n)
在必要时调用。
关于Backpressure的文章(“Reactive pull”部分)描述了Observer
的观点,并简单地提到了
但是,要使 [reactive pull] 起作用,Observables A 和 B 必须正确响应 request() <...> 这种支持不是 Observables 的要求
它没有详细说明原则上如何实现这种支持。
是否有实现此类Observable
/的通用模式Flowable
?我没有找到一个很好的例子或参考。我能想到的一个糟糕的选择是有一个监视器,我会在while
循环中锁定并从另一个线程emitter.requested() == 0
解锁。doOnRequest()
但是这种方法只是感觉很麻烦。
那么这种“减速”一般是如何实现的呢?
谢谢。