1

我有以下示例演员代码。

object SomeExternalDep {
  private var flag = true

  // this function will throw an exception once when called with the value 3, then it won't throw another exception
  @throws[RuntimeException]
  def potentiallyThrows(curr: Int): Unit = {
    if (curr == 3 && flag) {
      flag = false
      throw new RuntimeException("Something went wrong in external dependency")
    }
  }
}

class CountingActor(start: Int, end: Int)
  extends Actor
    with ActorLogging {

  var curr: Int = start

  // This method counts for us
  private def doCount(): Unit = {
    // This may throw, which will cause this actor to fail
    SomeExternalDep.potentiallyThrows(curr)

    // Send self a count message. If the above call exceptions this is never called
    if (curr <= end) {
      self ! CountingActor.Count(curr)
    }
  }

  override def receive: Receive = {
    case CountingActor.Start => doCount()
    case CountingActor.Count(n) =>
      log.info(s"Counting: $n")
      curr += 1
      doCount()

    case x => log.error(s"bad message $x")
  }

  override def preRestart(reason: Throwable, message: Option[Any]): Unit = {
    log.error(s"CountingActor failed while processing $message")
    self ! CountingActor.Start
  }
}

object CountingActor {
  def props(start: Int, end: Int): Props = Props(new CountingActor(start, end))

  case object Start
  case class Count(n: Int)
}

class SupervisorActor
  extends Actor
    with ActorLogging {

  var countingActor: ActorRef = _

  override val supervisorStrategy: OneForOneStrategy =
    OneForOneStrategy(maxNrOfRetries = 10, withinTimeRange = 1.minute) {
      // case _: RuntimeException => Restart
      case _: RuntimeException => Resume
    }

  private def doStart(): Unit = {
    countingActor = context.actorOf(CountingActor.props(0, 5))

    countingActor ! CountingActor.Start
  }

  override def receive: Receive = {
    case SupervisorActor.Init => doStart()
    case _ => log.error(s"Supervisor doesn't process messages")
  }

}

在这里,CountingActor基本上是向自己发送一条Count消息。然后它调用一些可能失败的外部依赖项。它还在计数时对其内部状态进行了一些更改。我还实现了一个简单的SupervisorActor. 这个演员创建了CountingActor作为它的孩子。

当监管策略设置为Restart。我得到了预期的结果。演员数到 3,因为看到异常而失败。preRestart钩子向收件箱发送一条新消息,Start并重新开始计数。

[INFO] [07/10/2019 15:23:59.895] [counting-sys-akka.actor.default-dispatcher-2] [akka://counting-sys/user/$a/$a] Counting: 0
[INFO] [07/10/2019 15:23:59.896] [counting-sys-akka.actor.default-dispatcher-2] [akka://counting-sys/user/$a/$a] Counting: 1
[INFO] [07/10/2019 15:23:59.896] [counting-sys-akka.actor.default-dispatcher-2] [akka://counting-sys/user/$a/$a] Counting: 2
[ERROR] [07/10/2019 15:23:59.905] [counting-sys-akka.actor.default-dispatcher-2] [akka://counting-sys/user/$a/$a] Something went wrong in external dependency
java.lang.RuntimeException: Something went wrong in external dependency
    at SomeExternalDep$.potentiallyThrows(ActorSupervisionTest.scala:15)
    at CountingActor.CountingActor$$doCount(ActorSupervisionTest.scala:30)

<Stack Trace omitted>

[ERROR] [07/10/2019 15:23:59.909] [counting-sys-akka.actor.default-dispatcher-3] [akka://counting-sys/user/$a/$a] CountingActor failed while processing Some(Count(2))
[INFO] [07/10/2019 15:23:59.912] [counting-sys-akka.actor.default-dispatcher-3] [akka://counting-sys/user/$a/$a] Counting: 0
[INFO] [07/10/2019 15:23:59.912] [counting-sys-akka.actor.default-dispatcher-3] [akka://counting-sys/user/$a/$a] Counting: 1
[INFO] [07/10/2019 15:23:59.912] [counting-sys-akka.actor.default-dispatcher-3] [akka://counting-sys/user/$a/$a] Counting: 2
[INFO] [07/10/2019 15:23:59.912] [counting-sys-akka.actor.default-dispatcher-3] [akka://counting-sys/user/$a/$a] Counting: 3
[INFO] [07/10/2019 15:23:59.913] [counting-sys-akka.actor.default-dispatcher-3] [akka://counting-sys/user/$a/$a] Counting: 4
[INFO] [07/10/2019 15:23:59.913] [counting-sys-akka.actor.default-dispatcher-3] [akka://counting-sys/user/$a/$a] Counting: 5

但是当我将监督策略更改为Resume. 演员卡住了,因为它在向自己发送下Count一条消息之前就失败了。

[INFO] [07/10/2019 15:26:01.779] [counting-sys-akka.actor.default-dispatcher-5] [akka://counting-sys/user/$a/$a] Counting: 0
[INFO] [07/10/2019 15:26:01.780] [counting-sys-akka.actor.default-dispatcher-5] [akka://counting-sys/user/$a/$a] Counting: 1
[INFO] [07/10/2019 15:26:01.780] [counting-sys-akka.actor.default-dispatcher-5] [akka://counting-sys/user/$a/$a] Counting: 2
[WARN] [07/10/2019 15:26:01.786] [counting-sys-akka.actor.default-dispatcher-4] [akka://counting-sys/user/$a/$a] Something went wrong in external dependency

如何解决这个问题,以便在外部依赖失败时从 3 开始计数?

4

1 回答 1

2

看起来实际启动的代码您的逻辑基本上是从 1 到 N 的循环,在每次迭代中,您发送一条消息以进行下一次迭代,问题是如果抛出异常,您不会发送进入下一次迭代的消息,这里是监督者工作的地方,重新启动很简单,因为再次启动循环的代码被执行,但如果你恢复流程,进入下一次迭代的消息永远不会发送。

一个简单的解决方法是更改​​方法上的操作顺序doCount,首先将消息发送给自身,然后处理危险操作,这应该适用于该Resume策略,但我会在实际使用这种方法之前测试一些场景,一个未知的me是在Restart策略的情况下akka是否会丢弃邮箱,我相信不会,也就是说重启actor后会得到待处理的消息。

另一种解决方法可能是在恢复子角色后重新发送来自主管的消息。

编辑:我查看了akka源代码,并没有明显的方法来捕捉恢复事件,实际上有一个内部Resume事件,但它是akka私有的,不会发送给你的实际演员,我认为如果你喜欢使用该Resume策略,不要打扰主管,只需在您的演员内部捕捉可能的异常(基本上模拟恢复策略),这应该会给您预期的行为,而不是处理可能的极端情况。

于 2019-07-11T04:03:22.607 回答