4

我一直在使用 Akka Supervisor Strategy 来处理业务逻辑异常。

阅读最著名的 Scala 博客系列Neophyte之一,我发现他为我一直在做的事情赋予了不同的目的。

例子:

假设我有一个 HttpActor 应该联系外部资源,如果它关闭,我将抛出一个异常,现在是一个ResourceUnavailableException.

如果我的主管发现了这一点,我会在我的 HttpActor 上调用 Restart,在我的 HttpActorpreRestart方法中,我会调用 do aschedulerOnce来重试。

演员:

class HttpActor extends Actor with ActorLogging {

  implicit val system = context.system

  override def preRestart(reason: Throwable, message: Option[Any]): Unit = {
    log.info(s"Restarting Actor due: ${reason.getCause}")
    message foreach { msg =>
      context.system.scheduler.scheduleOnce(10.seconds, self, msg)
    }
  }

  def receive = LoggingReceive {

    case g: GetRequest =>
      doRequest(http.doGet(g), g.httpManager.url, sender())
  }

主管:

class HttpSupervisor extends Actor with ActorLogging with RouterHelper {

  override val supervisorStrategy =
    OneForOneStrategy(maxNrOfRetries = 5) {
      case _: ResourceUnavailableException   => Restart
      case _: Exception                      => Escalate
    }

  var router = makeRouter[HttpActor](5)

  def receive = LoggingReceive {
    case g: GetRequest =>
      router.route(g, sender())

    case Terminated(a) =>
      router = router.removeRoutee(a)
      val r = context.actorOf(Props[HttpActor])
      context watch r
      router = router.addRoutee(r)
  }
}

这里有什么意义?

根据调度程序,如果我的doRequest方法抛出ResourceUnavailableException,主管将得到它并重新启动actor,强制它在一段时间后重新发送消息。我看到的优点是我免费获得重试次数和处理异常本身的好方法。

现在查看博客,他展示了一种不同的方法,以防您需要重试内容,只需发送如下消息:

def receive = {
  case EspressoRequest =>
    val receipt = register ? Transaction(Espresso)
    receipt.map((EspressoCup(Filled), _)).recover {
      case _: AskTimeoutException => ComebackLater
    } pipeTo(sender)

  case ClosingTime => context.system.shutdown()
}

在 的情况下AskTimeoutExceptionFuture他将结果作为ComebackLater对象进行管道传输,他将处理这样做:

case ComebackLater =>
      log.info("grumble, grumble")
      context.system.scheduler.scheduleOnce(300.millis) {
        coffeeSource ! EspressoRequest
      }

对我来说,这几乎是你可以用策略主管做的事情,但是以手动方式,没有内置的重试次数逻辑。

那么这里最好的方法是什么,为什么?我使用 akka 主管策略的概念完全错误吗?

4

1 回答 1

4

您可以使用BackoffSupervisor

作为内置模式,它akka.pattern.BackoffSupervisor 实现了所谓的指数退避监督策略,当子actor失败时再次启动它,每次重启之间的时间延迟都会增加。

val supervisor = BackoffSupervisor.props(
  Backoff.onFailure(
    childProps,
    childName = "myEcho",
    minBackoff = 3.seconds,
    maxBackoff = 30.seconds,
    randomFactor = 0.2 // adds 20% "noise" to vary the intervals slightly 
  ).withAutoReset(10.seconds) // the child must send BackoffSupervisor.Reset to its parent 
  .withSupervisorStrategy(
    OneForOneStrategy() {
      case _: MyException => SupervisorStrategy.Restart
      case _ => SupervisorStrategy.Escalate
    }))
于 2016-09-16T02:49:55.737 回答