我是 RxJava 的新手,但没有其他人回答你的问题,所以这里是一个尝试。
我的建议是让您的 Producer 类实现接口Observable.OnSubscribeFunc
(Javadoc)。在方法public Subscription onSubscribe(Observer<T> observer)
中,您可以调用您的Producer#readData()
方法来获取当前可用的数据并将该数据传递给onNext()
方法,如下所示:
// Make sure to change T as appropriate
public class Producer implements OnSubscribeFunc<T> {
. . .
@Override
public Subscription onSubscribe(Observer<T> observer) {
while (this.hasData()) {
observer.onNext(this.readData());
observer.onCompleted();
}
}
(这假设您的Producer#readData()
方法以块的形式返回数据,而不是一次全部返回。根据需要进行调整。)
然后,您可以使用方法订阅您的 Producer 对象Observable#create(OnSubscribeFunc<T>)
,并安排在计时器上让您的 Observable 返回元素:
// In your observer class
Observable<T> myProducer = Observable.create(new Producer(...));
ScheduledExecutorService scheduler =
Executors.newSingleThreadScheduledExecutor().scheduleWithFixedDelay(....);
Observable<T> myProducerTimed = myProducer.subscribeOn(Schedulers.executor(scheduler));
// Now use myProducerTimed
我现在无法测试最后两行。抱歉,如果它改变了事情,我会调查并更新我的答案。
免责声明:我是一个 RxJava n00b,所以这个解决方案可能很糟糕或者可能是不必要的混乱。修复看起来不对的地方。
更新:我也遇到了 RxJava 的问题(RxJava -- Terminating Infinite Streams);看起来我的解决方案看起来很像你的(使用调度程序定期制作 Observable 返回元素)。