1

我正在查看在给定时间间隔创建的 RxScala 可观察对象:

val periodic: Observable[Long] = Observable.interval(100 millis)

periodic.foreach(x => println(x))

如果我把它放在工作表中,我会得到这个结果:

periodic: rx.lang.scala.Observable[Long] = rx.lang.scala.JavaConversions$$anon$2@2cce3493

res0: Unit = ()

这让我很困惑: 的元素periodic实际上包含什么?

它们是否包含一些索引?它们是否包含创建它们的时间间隔?

4

2 回答 2

2

正如您可以在此处阅读http://reactivex.io/documentation/operators/interval.html生成的元素是Long0.

至于您的代码和结果:

在这里,您创建了 observable,并被Observable[Long]分配到periodic. 一切如预期。

scala> val periodic: Observable[Long] = Observable.interval(100 millis)
periodic: rx.lang.scala.Observable[Long] = rx.lang.scala.JavaConversions$$anon$2@2cce3493

在这里,您注册了一个回调,即在发出值时会发生什么。方法的返回类型foreachUnit因为它没有合理的值,并且只是为了注册回调的副作用而发生。

periodic.foreach(x => println(x))
res0: Unit = ()

您看不到实际值,因为执行停止。尝试插入Thread.sleep.

val periodic: Observable[Long] = Observable.interval(100.millis)
periodic.foreach(x => println(x))
Thread.sleep(1000)

给出类似于

periodic: rx.lang.scala.Observable[Long] = rx.lang.scala.JavaConversions$$anon$2@207cb62f

res0: Unit = ()

0
1
2
3
4
5
6
7
8
9
res1: Unit = ()
于 2016-06-05T11:00:11.923 回答
1

问题是间隔是异步的,所以你不需要等待结果。

等待结果的另一种方法是使用 TestSubscriber

def interval(): Unit = {
addHeader("Interval observable")
Observable.interval(createDuration(100))
  .map(n => "New item emitted:" + n)
  .doOnNext(s => print("\n" + s))
  .subscribe();
new TestSubscriber[Subscription].awaitTerminalEvent(1000, TimeUnit.MILLISECONDS);
 }

您可以在此处查看更多示例https://github.com/politrons/reactiveScala

于 2016-06-27T09:09:50.323 回答