6

我有一个类Producer,简化它有public Object readData() 我想让这个类成为ObservableRxJava)的方法。

如何指示应该调用哪个方法?我需要将我的Producer课程转换为FutureorIterable吗?

下一个问题是readData应该每 n 秒调用一次。某些方法,例如from,具有调度程序参数,但我找不到任何如何应用它的示例。我找到了间隔方法,但它会发出一个整数序列。到目前为止,没有 Observable 我使用Executors.newSingleThreadScheduledExecutor().scheduleWithFixedDelay(....)

4

2 回答 2

3

您仍然可以使用该interval()方法,只需忽略其结果!

final Producer producer = ...;
int n = ...;
Observable<Object> obs = 
    Observable.interval(n,TimeUnit.SECONDS)
              .map(new Func1<Integer,Object>() {
                  @Override
                  public Object call(Integer i) {
                      return producer.readData();
                  }
               ));
于 2014-02-09T07:53:09.190 回答
1

我是 RxJava 的新手,但没有其他人回答你的问题,所以这里是一个尝试。

我的建议是让您的 Producer 类实现接口Observable.OnSubscribeFuncJavadoc)。在方法public Subscription onSubscribe(Observer<T> observer)中,您可以调用您的Producer#readData()方法来获取当前可用的数据并将该数据传递给onNext()方法,如下所示:

// Make sure to change T as appropriate
public class Producer implements OnSubscribeFunc<T> {
    . . . 

    @Override
    public Subscription onSubscribe(Observer<T> observer) {
        while (this.hasData()) {
            observer.onNext(this.readData());
        observer.onCompleted();
    }
}

(这假设您的Producer#readData()方法以块的形式返回数据,而不是一次全部返回。根据需要进行调整。)

然后,您可以使用方法订阅您的 Producer 对象Observable#create(OnSubscribeFunc<T>),并安排在计时器上让您的 Observable 返回元素:

// In your observer class
Observable<T> myProducer = Observable.create(new Producer(...));

ScheduledExecutorService scheduler = 
        Executors.newSingleThreadScheduledExecutor().scheduleWithFixedDelay(....);
Observable<T> myProducerTimed = myProducer.subscribeOn(Schedulers.executor(scheduler));
// Now use myProducerTimed

我现在无法测试最后两行。抱歉,如果它改变了事情,我会调查并更新我的答案。

免责声明:我是一个 RxJava n00b,所以这个解决方案可能很糟糕或者可能是不必要的混乱。修复看起来不对的地方。

更新:我也遇到了 RxJava 的问题(RxJava -- Terminating Infinite Streams);看起来我的解决方案看起来很像你的(使用调度程序定期制作 Observable 返回元素)。

于 2013-12-18T17:58:39.497 回答