我也有同样的问题,并且对 Scala Rx 没有 onErrorFlatMap 感到失望。所以我尝试自己实现这个功能。
我的解决方案如下所示(参见解决方案)。关键方法是这个:
def recover[T](target: Observable[T]): Observable[Try[T]] = {
target.map { Success(_) }.
onErrorResumeNext(
(err: Throwable) => Observable.just(Failure(err)) ++ recover(target)
)
}
“恢复”方法的详细信息
'recover' 的第一个参数是即使在它抛出异常之后你想要继续挤压事件的 observable。我尝试了各种其他方法,但这是唯一对我有用的方法。我最初期望 Scala Rx 的 onErrorReturn 将任何错误映射到我的恢复函数所指定的值,然后继续,但我错过了“可观察合同”的全部要点,即 Observable 需要停止发送任何进一步onCompleted 或 OnError 之后的事件。任何在错误后继续喷出事件的 Observable 都将被标记为“病态”(并被礼貌社会适当回避),如下所述:https ://github.com/ReactiveX/RxJava/wiki/Phantom-Operators#onerrorflatmap
成功的 onNext 调用的有效负载被封装在 Success() 中,而任何异常都将由 onErrorResumeNext 处理,它将从 (1) 封装错误的 Observable 和 (2) 封装的目标实例创建串联的 Observable 流递归调用恢复。我最初担心无限递归。但一切都很好。
限制
我应该提到,在原始发帖人的问题的情况下——它使用 Observable.interval,这不会很好,因为恢复(目标)将是原始的 Observable.interval,它会从一开始就开始发射,所以你永远不会取得进展。对于间隔之类的东西,您必须编写自己的基于计时器的间隔,可以重新启动。异常值有望为您提供足够的信息来告诉您需要重新启动的值。
A SOLUTION
object RecoverFromExceptionsInObservable extends App {
import rx.lang.scala._
import scala.language.postfixOps
import scala.util.{Try, Failure, Success}
val MILLISECS = 500L
var tickCount = 0
/**
* There is a bug in this code which we will ignore for the simple purpose of
* this test. The bug is that timers are never stopped and cleaned up.
*/
def getTickObservable(): Observable[Int] = {
@volatile var subscribers: Set[Observer[Int]] = Set.empty
val t = new java.util.Timer()
val task = new java.util.TimerTask {
def run() = {
subscribers.foreach(s => s.onNext(tickCount))
tickCount += 1
}
}
t.schedule(task, 0L, MILLISECS)
Observable.create[Int] {
(obs: Observer[Int]) => {
subscribers = subscribers + obs
Subscription {
subscribers = subscribers - obs
}
}
}
}
def recover[T](target: Observable[T]): Observable[Try[T]] = {
target.map { Success(_) }.
onErrorResumeNext(
(err: Throwable) => Observable.just(Failure(err)) ++ recover(target)
)
}
val stream1 = getTickObservable() map { item =>
if (item % 2 == 0) throw new RuntimeException(s"error on $item") else item
}
recover(stream1).subscribe(
term => {
println(s" ${Thread.currentThread().getName()} onNext: $term")
},
t => {
println("in error callback")
println(s" ${Thread.currentThread().getName()} onError: $t")
},
() => println(s" ${Thread.currentThread().getName()} subscriber complete")
)
}
这是运行上述代码的部分输出:
Timer-0 onNext: Success(1)
Timer-0 onNext: Failure(java.lang.RuntimeException: error on 2)
Timer-0 onNext: Success(3)
Timer-0 onNext: Failure(java.lang.RuntimeException: error on 4)
Timer-0 onNext: Success(5)
Timer-0 onNext: Failure(java.lang.RuntimeException: error on 6)
我不希望这个答案永远持续下去,所以我跳过了一些关于我解决这个问题的替代方法的细节,如果你有兴趣,可以在这里阅读。