0

这是我目前拥有的 Scala 代码:

val b = Observable.interval(1 second).map(n => if (n % 2 == 1) throw new Exception else n*n)

b.subscribe(n => println(n), e => println("error"), () => println("done"))

这是我的输出:

0
错误

如何修改我的 Observable 以便在每个错误之后继续运行,并且我将得到如下输出:

0
error
2
error
4
...
4

2 回答 2

1

您可以使用各种错误处理程序之一。我认为在你的情况下 onErrorFlatMap 可能是正确的选择:

维基

JavaDoc

不幸的是,onErrorFlatMap 不是(从 0.19.0 版开始)scala api 的一部分。

于 2014-06-04T06:16:19.400 回答
0

我也有同样的问题,并且对 Scala Rx 没有 onErrorFlatMap 感到失望。所以我尝试自己实现这个功能。

我的解决方案如下所示(参见解决方案)。关键方法是这个:

  def recover[T](target: Observable[T]): Observable[Try[T]] = {
      target.map { Success(_) }.
        onErrorResumeNext(
          (err: Throwable) => Observable.just(Failure(err)) ++ recover(target)
        )
  }

“恢复”方法的详细信息

'recover' 的第一个参数是即使在它抛出异常之后你想要继续挤压事件的 observable。我尝试了各种其他方法,但这是唯一对我有用的方法。我最初期望 Scala Rx 的 onErrorReturn 将任何错误映射到我的恢复函数所指定的值,然后继续,但我错过了“可观察合同”的全部要点,即 Observable 需要停止发送任何进一步onCompleted 或 OnError 之后的事件。任何在错误后继续喷出事件的 Observable 都将被标记为“病态”(并被礼貌社会适当回避),如下所述:https ://github.com/ReactiveX/RxJava/wiki/Phantom-Operators#onerrorflatmap

成功的 onNext 调用的有效负载被封装在 Success() 中,而任何异常都将由 onErrorResumeNext 处理,它将从 (1) 封装错误的 Observable 和 (2) 封装的目标实例创建串联的 Observable 流递归调用恢复。我最初担心无限递归。但一切都很好。

限制

我应该提到,在原始发帖人的问题的情况下——它使用 Observable.interval,这不会很好,因为恢复(目标)将是原始的 Observable.interval,它会从一开始就开始发射,所以你永远不会取得进展。对于间隔之类的东西,您必须编写自己的基于计时器的间隔,可以重新启动。异常值有望为您提供足够的信息来告诉您需要重新启动的值。

A SOLUTION




object RecoverFromExceptionsInObservable extends App {

  import rx.lang.scala._
  import scala.language.postfixOps
  import scala.util.{Try, Failure, Success}


  val MILLISECS = 500L
  var tickCount = 0

  /**
   * There is a bug in this code which we will ignore for the simple purpose of
   * this test. The bug is that timers are never stopped and cleaned up.
   */
  def getTickObservable(): Observable[Int] = {
    @volatile var subscribers: Set[Observer[Int]] = Set.empty

    val t = new java.util.Timer()
    val task = new java.util.TimerTask {
      def run() = {
        subscribers.foreach(s => s.onNext(tickCount))
        tickCount += 1
      }
    }
    t.schedule(task, 0L, MILLISECS)

    Observable.create[Int] {
      (obs: Observer[Int]) => {
        subscribers = subscribers + obs
        Subscription {
          subscribers = subscribers - obs
        }
      }
    }
  }

  def recover[T](target: Observable[T]): Observable[Try[T]] = {
      target.map { Success(_) }.
        onErrorResumeNext(
          (err: Throwable) => Observable.just(Failure(err)) ++ recover(target)
        )
  }


  val stream1 = getTickObservable() map { item =>
    if (item % 2 == 0) throw new RuntimeException(s"error on $item") else item
  }

  recover(stream1).subscribe(
    term => {
      println(s" ${Thread.currentThread().getName()}  onNext: $term")
    },
    t => {
      println("in error callback")
      println(s" ${Thread.currentThread().getName()}  onError: $t")
    },
    () => println(s" ${Thread.currentThread().getName()} subscriber complete")
  )
}

这是运行上述代码的部分输出:

 Timer-0  onNext: Success(1)
 Timer-0  onNext: Failure(java.lang.RuntimeException: error on 2)
 Timer-0  onNext: Success(3)
 Timer-0  onNext: Failure(java.lang.RuntimeException: error on 4)
 Timer-0  onNext: Success(5)
 Timer-0  onNext: Failure(java.lang.RuntimeException: error on 6)

我不希望这个答案永远持续下去,所以我跳过了一些关于我解决这个问题的替代方法的细节,如果你有兴趣,可以在这里阅读。

于 2015-10-24T23:46:38.057 回答