0

我一直在努力解决我认为是一个非常基本的问题。

我有一个Flowable从网络中检索一组项目并发出它们的方法。

Flowable
    .create(new FlowableOnSubscribe<Item>() {
        @Override
        public void subscribe(FlowableEmitter<Item> emitter) throws Exception {
            Item item = retrieveNext();

            while (item != null) {
                emitter.onNext(item);

                if (item.isLast) {
                    break;
                }

                item = retrieveNext();
            }

            emitter.onComplete();
        }
    }, BackpressureStrategy.BUFFER)
    .doOnRequest(new LongConsumer() {
        @Override
        public void accept(long t) throws Exception {
            // ...
        }
    });

我需要它在每个请求的基础上工作。也就是说,我保留一个Subscriptionsubscription.request(n)在必要时调用。

关于Backpressure的文章(“Reactive pull”部分)描述了Observer的观点,并简单地提到了

但是,要使 [reactive pull] 起作用,Observables A 和 B 必须正确响应 request() <...> 这种支持不是 Observables 的要求

它没有详细说明原则上如何实现这种支持。

是否有实现此类Observable/的通用模式Flowable?我没有找到一个很好的例子或参考。我能想到的一个糟糕的选择是有一个监视器,我会在while循环中锁定并从另一个线程emitter.requested() == 0解锁。doOnRequest()但是这种方法只是感觉很麻烦。

那么这种“减速”一般是如何实现的呢?

谢谢。

4

1 回答 1

2

使用Flowable.generate

Flowable.generate(
  () -> generateItemRetriever(),
  (Retriever<Item> retriever, Emitter<Item> emitter) -> {
    Item item = retriever.retrieveNext();
    if(item!=null && !item.isLast()) {
      emitter.onNext(item);
    } else {
      emitter.onComplete();
    }
  }
)

Thgenerated Flowable 将具有背压感知和退订感知。如果您Retriever有需要处理的状态,请使用 3 参数generate重载。

但是,如果您正在执行 HTTP 调用,我建议您研究类似 Retrofit 的东西,它与 RxJava 非常吻合。

于 2017-02-28T16:00:04.123 回答