2

我是 Akka 的新手(Java lib,v2.3.9)。我正在尝试遵循主管层次结构的最佳实践,但由于这是我的第一个 Akka 应用程序,因此在某个地方遇到了心理障碍。

在我的第一个 Akka 应用程序(实际上是一个旨在跨多个应用程序重用的库)中,来自外部世界的输入表现Process为传递给参与者的消息。使用我的应用程序的开发人员将提供一个基于文本的配置文件,该文件最终配置哪些参与者会收到发送的Process实例,哪些不会。换句话说,假设这些是我的演员课程:

// Groovy pseudo-code
class Process {
    private final Input input

    Process(Input input) {
        super()
        this.input = deepClone(input)
    }

    Input getInput() {
        deepClone(this.input)
    }
}

class StormTrooper extends UntypedActor {
    @Override
    void onReceive(Object message) {
        if(message instanceof Process) {
            // Process the message like a Storm Trooper would.
        }
    }
}

class DarthVader extends UntypedActor {
    @Override
    void onReceive(Object message) {
        if(message instanceof Process) {
            // Process the message like Darth Vader would.
        }
    }
}

class Emperor extends UntypedActor {
    @Override
    void onReceive(Object message) {
        if(message instanceof Process) {
            // Process the message like the Emperor would.
        }
    }
}

// myapp-config.json -> where the actors are configured, along with other
// app-specific configs
{
    "fizzbuzz": "true",
    "isYosemite": "false",
    "borderColor": "red",
    "processors": [
        "StormTrooper",
        "Emperor"
    ]
}

正如您在配置文件中看到的那样,只有StormTrooperEmperor被选中来接收Process消息。这最终导致创建零 (0)DarthVader个参与者。我的意图也是,这将导致对Set<ActorRef>填充的应用程序可用,StormTrooper如下Emperor所示:

class SomeApp {
    SomeAppConfig config

    static void main(String[] args) {
        String configFileUrl = args[0] // Nevermind this horrible code

        // Pretend here that configFileUrl is a valid path to
        // myapp-config.json.

        SomeApp app = new SomeApp(configFileUrl)
        app.run()
    }

    SomeApp(String url) {
        super()

        config = new SomeAppConfig(url)
    }

    void run() {
        // Since the config file only specifies StormTrooper and
        // Emperor as viable processors, the set only contains instances of
        // these ActorRef types.
        Set<ActorRef> processors = config.loadProcessors()
        ActorSystem actorSystem = config.getActorSystem()

        while(true) {
            Input input = scanForInput()
            Process process = new Process(input)

            // Notify each config-driven processor about the
            // new input we've received that they need to process.
            processors.each {
                it.tell(process, Props.self()) // This isn't correct btw
            }
        }
    }
}

因此,正如您(希望)看到的那样,我们拥有所有这些参与者(实际上,有几十个UntypedActorimpl)来处理Process消息(这些消息Input又从某个来源捕获)。至于哪些 Actor 还活着/在线来处理这些Process消息完全是配置驱动的。最后,每次应用程序收到一个Input,它都会被注入到一条Process消息中,并且该Process消息会发送给所有已配置/正在运行的参与者。

有了这个作为给定的背景故事/设置,我无法确定“演员/主管层次结构”需要是什么。在我的用例中,似乎所有参与者都是真正平等的,他们之间没有监督结构。如果该类型的参与者被配置为存在,则StormTrooper只会收到一条消息。Process其他演员子类也是如此。

我在这里完全错过了什么吗?如果所有参与者都是平等的并且层次结构本质上是“扁平”/水平的,我如何定义监督层次结构(出于容错目的)?

4

2 回答 2

2

如果您想为每个演员实例化不超过一个实例 - 您可能需要SenatorPalpatine监督这三个。如果您可以说不止一个StormTrooper- 您可能希望让JangoFett参与者负责创建(并可能杀死)它们,一些路由器也是不错的选择(它会自动监督它们)。这也将使您能够在失败时重新启动所有士兵(OneForAllStrategy),能够广播,保持一些共同的统计数据等。

带有路由器的示例(伪 Scala):

//application.conf
akka.actor.deployment {
  /palpatine/vader {
    router = broadcast-pool
    nr-of-instances = 1
  }
  /palpatine/troopers {
    router = broadcast-pool
    nr-of-instances = 10
  }
}

class Palpatine extends Actor {
    import context._

    val troopers = actorOf(FromConfig.props(Props[Trooper], 
"troopers").withSupervisorStrategy(strategy) //`strategy` is strategy for troopers

    val vader = actorOf(FromConfig.props(Props[Vader]), "vader")

    override val supervisorStrategy = OneForOneStrategy(maxNrOfRetries = 10, withinTimeRange = 1) //stategy for Palpatine's children (routers itself)

    val strategy = OneForOneStrategy(maxNrOfRetries = 100, withinTimeRange = 1) //stategy for troopers

    def receive = {
         case p@Process => troopers ! p; vader ! p
         case t@Terminted => println(t)
    }
 }

这会创建基于标准akka-config的广播池。我还展示了您可以分别为它们自定义监督策略。

如果您希望某些演员因某种原因忽略消息 - 只需在演员内部实现此逻辑,例如:

class Vader extends Actor {
    def receive {
        case p@Process => ...
        case Ignore => context.become(ignore) //changes message handler to `ignore`
    }


    def ignore = {
        case x => println("Ignored message " + x)
        case UnIgnore => context.become(process)//changes message handler back
    }

}

这将动态配置忽略/取消忽略(否则它只是一个简单的if)。您可以Ignore根据一些配置向参与者发送消息:

val listOfIgnorantPathes = readFromSomeConfig()
context.actorSelection(listOfIgnoredPathes) ! Ignore

如果您想从配置控制异构广播,您还可以像 trooper 的路由器一样为 palpatine 创建广播器(只需使用组而不是池):

akka.actor.deployment {
  ... //vader, troopers configuration

  /palpatine/broadcaster {
    router = broadcast-group
    routees.paths = ["/palpatine/vader", "/palpatine/troopers"]
  }
}

class Palpatine extends Actor {
   ... //vader, troopers definitions

   val broadcaster = actorOf(FromConfig.props(), "broadcaster")

   def receive = {
     case p@Process => broadcaster ! p
   }
}

只需将 vader 排除在外,routees.paths以使他不接收Process消息。

PS Actor 永远不会孤单 - 总有 Guardian Actor(请参阅The Top-Level Supervisors),它会在出现异常时关闭整个系统。所以无论哪种SenatorPalpatine方式都可能真的成为你的救星。

PS2context.actorSelection("palpatine/*")实际上允许您向所有孩子发送消息(作为广播池和组的替代方案),因此您不需要在其中设置一组。

于 2015-04-23T14:47:50.447 回答
1

根据您的评论,您仍然希望Master演员复制和分发Processes。从概念上讲,您不会让用户(或生成输入的任何东西)为每个参与者提供一次相同的输入。他们只会提供一次消息,然后您(或Master演员)将根据需要复制消息并将其发送给每个适当的子演员。

正如dk14 的回答中所讨论的,这种方法具有增加容错性的额外好处。

于 2015-04-23T15:07:15.387 回答