2

假设我们有 Ints 的源 Observable:

val source:Observable[Int]

我想创建另一个 Observable,产生的值与中首次出现的值的差异大于 10:

def detect() = Observable[Int](
  subscriber =>
    if (!subscriber.isUnsubscribed) {
      var start:Option[Int] = None
      source.subscribe(
        item => {
          if (start.isEmpty) {
            start = Option(item)
          }
          else {
            start.filter(v => Math.abs(item - v) > 10).foreach {
              item => subscriber.onNext(item)
            }
          }
        }
      )
      subscriber.onCompleted()
    }
)

这里我使用 var start来保存Observable的第一个值。

有没有办法简化这段代码?我不喜欢这种为 var 赋值的方法

4

2 回答 2

3

这是我想出的:

import rx.lang.scala.Observable

val source = Observable.from(List(5, 2, 3, 16, -40, 2, -70, 50))

source.scan(Option.empty[(Int, Int)]) { (acc, next) =>
  acc.map(_.copy(_2 = next)) orElse Some((next, next))
}.collect {
  case Some((start, current)) if math.abs(start - current) > 10 => current
}.subscribe(x => println(x))

印刷

16
-40
-70
50

基本上 scan 保留一个可以未初始化的累加器(None),或者可以保存一对:第一个值和从源发出的最后一个元素。然后我们只收集那些满足你的谓词的元素。

于 2016-08-26T19:55:02.057 回答
0

您只需要应用filter运算符,它会生成一个新的 Observable,它反映源 observable 的发射,但会跳过谓词测试为 false 的那些:

val filtered = source.filter(v => Math.abs(item - v) > 10)
于 2016-08-26T19:30:57.757 回答