0

我想做的是创建一个每秒运行另一个函数的函数。第二个函数返回Observables<A>,我希望第一个函数也返回Observables<A>而不是Observable<Observable<A>>

例如:

private A calcA(){
   ...
   return new A(...)
}

public Observable<A> getAs(){
   return Observable.create( subscriber -> {
      Bool condition = ...
      do {
         subscriber.onNext(calcA())
      } while (condition)
      subscriber.onComplete()
   })
}

public Observable<A> pollAs(){
   return Observable.create(subscriber -> {
      do {
         subscriber.onNext(getAs()) // Flatten here I guess
         Thread.sleep(1000)
      } while(true)
   })

所以我想做类似的事情(我试图用Java-ish方式写这个,但我会使用Kotlin)

4

2 回答 2

2

你不需要使用flatMap()操作符来展平内部的 observable,因为你只想重复订阅同一个 observable。

public Observable<A> getAs() {
   return Observable.fromCallable( () -> calcA() )
            .repeat()
            .takeWhile( v -> !condition( v );
}

getAs()将发射项目,直到达到条件。然后它将完成。

public Observable<A> pollAs(){
   return getAs()
            .repeatWhen( completed -> completed.delay(1000, TimeUnit.MILLISECONDS) );

pollAs()将不断地重新订阅getAs()observable,在每次订阅之间暂停一秒钟。

编辑:我已将一个为期 6 个月的示例上传到https://pastebin.com/kSmi24GF 它表明您必须不断推进数据出来的时间。

于 2018-04-19T23:20:34.947 回答
0

我想出了这个解决方案:

public Observable<A> pollAs() {
   return Observable.create(subscriber -> {
       do {
           getAs().subscribe(
                   { subscriber.onNext(it) },
                   { subscriber.onError(it) },
                   { Thread.sleep(1000) }
           )
       } while (true)
   })
}

我真的不喜欢这个,有人可以告诉我一个更方便的方法吗?

于 2018-04-19T21:31:26.753 回答