2

我有以下顶级(“最父母”)演员:

// Groovy pseudo-code
class Master extends UntypedActor {
    ActorRef child1
    ActorRef child2
    ActorRef child3
    ActorRef backup

    @Override
    void onReceive(Object message) throws Exception {
        if(message instanceof Terminated) {
            Terminated terminated = message as Terminated
            if(terminated.actor != backup) {
                terminated.actor = backup
            } else {
                // TODO: What to do here? How to escalate from here?
            }
        } else {
            child1.tell(new DoSomething(message), getSelf())
            child2.tell(new DoSomethingElse(message), getSelf())
            child3.tell(new DoSomethingElser(message, getSelf())
        }
    }

    @Override
    SupervisorStrategy supervisorStrategy() {
        new OneForOneStrategy(10, Duration.minutes(“1 minute”, new Future<Throwable, SupervisorStrategy.Directive> {
            @Override
            Directive apply(Throwable t) throws Exception {
                if(isRecoverable(t) {   // Don’t worry about how/where this is defined or how it works
                    SupervisorStrategy.stop()
                } else {
                    SupervisorStrategy.escalate()
                }
            }
        })
    }
}

如您所见,它监督三个孩子,当这三个孩子抛出“可恢复”异常时,它们将被停止并替换为备份。到目前为止,一切都很好。

我现在面临的问题是,如果备用演员抛出任何可抛出的东西,我想考虑这个Master演员(实际上,我的应用程序)处于无法继续处理任何输入的状态,并升级监护人级别的例外。

我是 Akka 的新手,不知道把这段代码放在哪里,以及它应该是什么样子。同样,我只需要这样说的逻辑:

  • 如果备份 actor 抛出任何 throwable,将异常升级到Master的父级,它应该是 Akka “<a href="http://doc.akka.io/docs/akka/snapshot/general/supervision.html " rel="nofollow">guaradian 演员/构造

第一部分是我们需要知道何时从备份中抛出异常;我可以处理这部分,所以让我们假设我们的策略现在看起来像这样:

@Override
SupervisorStrategy supervisorStrategy() {
    new OneForOneStrategy(10, Duration.minutes(“1 minute”, new Future<Throwable, SupervisorStrategy.Directive> {
        @Override
        Directive apply(Throwable t) throws Exception {
            if(wasThrownFromBackup(t)) {
                SupervisorStrategy.escalate()
            } else if(isRecoverable(t) {
                SupervisorStrategy.stop()
            } else {
                SupervisorStrategy.escalate()
            }
        }
    })
}

但正如你所看到的,我仍在努力实现“脱离参与者系统”的升级。想法?Java 代码示例非常受欢迎,因为 Scala 在我看来就像象形文字。

4

1 回答 1

2

在此处查看“收割者”模式http://letitcrash.com/post/30165507578/shutdown-patterns-in-akka-2抱歉,它在 Scala 中,但我认为它很容易转换为 Java。

也看看这里,https://groups.google.com/forum/#!topic/akka-user/QG_DL7FszMU

你应该在你的配置中设置

akka.actor.guardian-supervisor-strategy = "akka.actor.StoppingSupervisorStrategy"

这将导致任何升级的“顶级”参与者被系统停止。然后你实现另一个名为'Reaper'(或任何你想称之为的)的顶级演员,它只有一个工作,观察主要的顶级演员并context.system.shutdown()在顶级演员停止时采取行动(例如)。

我不知道 akka java API,所以无法为您提供确切的示例,但在 Scala 中,来自上面的 LetItCrash 博客,它看起来像:

import akka.actor.{Actor, ActorRef, Terminated}
import scala.collection.mutable.ArrayBuffer

object Reaper {
  // Used by others to register an Actor for watching
  case class WatchMe(ref: ActorRef)
}

abstract class Reaper extends Actor {
  import Reaper._

  // Keep track of what we're watching
  val watched = ArrayBuffer.empty[ActorRef]

  // Derivations need to implement this method.  It's the
  // hook that's called when everything's dead
  def allSoulsReaped(): Unit

  // Watch and check for termination
  final def receive = {
    case WatchMe(ref) =>
      context.watch(ref)
      watched += ref
    case Terminated(ref) =>
      watched -= ref
      if (watched.isEmpty) allSoulsReaped()
  }
}

class ProductionReaper extends Reaper {
  // Shutdown
  def allSoulsReaped(): Unit = context.system.shutdown()
}

在你的应用程序启动中,你创建你的主actor,创建你的收割者,向收割者发送WatchMe(masterActor)消息。

于 2015-07-28T08:05:55.860 回答