4

一个描述我的问题的简单代码示例:

import scala.util._
import scala.concurrent._
import scala.concurrent.duration._
import ExecutionContext.Implicits.global

class LoserException(msg: String, dice: Int) extends Exception(msg) { def diceRoll: Int = dice }

def aPlayThatMayFail: Future[Int] = {
    Thread.sleep(1000) //throwing a dice takes some time...
    //throw a dice:
    (1 + Random.nextInt(6)) match {
        case 6 => Future.successful(6) //I win!
        case i: Int => Future.failed(new LoserException("I did not get 6...", i))
    }
}

def win(prefix: String): String = {
    val futureGameLog = aPlayThatMayFail
    futureGameLog.onComplete(t => t match {
        case Success(diceRoll) => "%s, and finally, I won! I rolled %d !!!".format(prefix, diceRoll)
        case Failure(e) => e match {
            case ex: LoserException => win("%s, and then i got %d".format(prefix, ex.diceRoll))
            case _: Throwable => "%s, and then somebody cheated!!!".format(prefix)
        }
    })
"I want to do something like futureGameLog.waitForRecursiveResult, using Await.result or something like that..."
}

win("I started playing the dice")   

这个简单的例子说明了我想要做什么。基本上,如果用文字来说,我想等待一些计算的结果,当我对先前的成功或失败的尝试进行不同的操作时。

那么你将如何实现该win方法?

我的“现实世界”问题,如果有什么不同的话,是dispatch用于异步 http 调用,我想在前一个结束时继续进行 http 调用,但是在前一个 http 调用是否成功时操作会有所不同。

4

3 回答 3

6

您可以通过递归调用恢复失败的未来:

def foo(x: Int) = x match {
  case 10 => Future.successful(x)
  case _ => Future.failed[Int](new Exception)
}

def bar(x: Int): Future[Int] = {
  foo(x) recoverWith { case _ => bar(x+1) }
}

scala> bar(0)
res0: scala.concurrent.Future[Int] = scala.concurrent.impl.Promise$DefaultPromise@64d6601

scala> res0.value
res1: Option[scala.util.Try[Int]] = Some(Success(10))

recoverWith接受 aPartialFunction[Throwable,scala.concurrent.Future[A]]并返回 a Future[A]。不过你应该小心,因为当它在这里进行大量递归调用时会占用相当多的内存。

于 2013-08-01T09:54:52.253 回答
2

正如 drexin 回答了关于异常处理和恢复的部分,让我尝试回答关于涉及期货的递归函数的部分。我相信使用 aPromise将帮助您实现目标。重组后的代码如下所示:

def win(prefix: String): String = {
    val prom = Promise[String]()

    def doWin(p:String) {
      val futureGameLog = aPlayThatMayFail
      futureGameLog.onComplete(t => t match {
          case Success(diceRoll) => prom.success("%s, and finally, I won! I rolled %d !!!".format(prefix, diceRoll))
          case Failure(e) => e match {
              case ex: LoserException => doWin("%s, and then i got %d".format(prefix, ex.diceRoll))
              case other => prom.failure(new Exception("%s, and then somebody cheated!!!".format(prefix)))
          }
      })        
    }
    doWin(prefix)
    Await.result(prom.future, someTimeout)
}

现在这不是真正的递归,因为由于期货是异步的,它将建立一个长堆栈,但它在本质上类似于递归。在这里使用 Promise 可以让您在递归执行此操作时阻止某些事情,从而阻止调用者在幕后发生的事情。

现在,如果我这样做,我可能会重新定义如下内容:

def win(prefix: String): Future[String] = {
    val prom = Promise[String]()

    def doWin(p:String) {
      val futureGameLog = aPlayThatMayFail
      futureGameLog.onComplete(t => t match {
          case Success(diceRoll) => prom.success("%s, and finally, I won! I rolled %d !!!".format(prefix, diceRoll))
          case Failure(e) => e match {
              case ex: LoserException => doWin("%s, and then i got %d".format(prefix, ex.diceRoll))
              case other => prom.failure(new Exception("%s, and then somebody cheated!!!".format(prefix)))
          }
      })        
    }
    doWin(prefix)
    prom.future
}   

这样,您可以将是否阻止或使用异步回调的决定推迟到此函数的调用者。这更灵活,但它也让调用者知道你正在做异步计算,我不确定你的场景是否可以接受。我会把这个决定留给你。

于 2013-08-01T12:02:40.783 回答
1

这对我有用:

def retryWithFuture[T](f: => Future[T],retries:Int, delay:FiniteDuration)    (implicit ec: ExecutionContext, s: Scheduler): Future[T] ={
    f.recoverWith { case _ if retries > 0 =>  after[T](delay,s)(retryWithFuture[T]( f , retries - 1 , delay)) }
}
于 2015-03-12T17:26:51.577 回答