2

我正在尝试向ClusterRouter配置中的所有路由广播消息。我已经尝试了两种选择。这个:

 val workerRouter = context.actorOf(Props[ClusterRouter].withRouter(
    ClusterRouterConfig(AdaptiveLoadBalancingRouter(metrics), ClusterRouterSettings(
      totalInstances = 100, routeesPath = "/user/slave",
      allowLocalRoutees = true, useRole = None))), name = "slaveRouter")

  context.system.scheduler.schedule(2 seconds, 5 seconds, workerRouter, Broadcast(CapabilityRequest))

和这个:

 val broadcastRouter = context.actorOf(Props[ClusterRouter].withRouter(
    ClusterRouterConfig(BroadcastRouter(Nil), ClusterRouterSettings(
      totalInstances = 100, routeesPath = "/user/slave",
      allowLocalRoutees = true, useRole = None))), name = "slaveRouter")

  context.system.scheduler.schedule(2 seconds, 5 seconds, broadcastRouter, CapabilityRequest)

但是对于他们两个来说,只有一个slaves接收到消息。想法?


为了理解为什么我认为第一次尝试应该奏效,必须AdaptiveLoadBalancingRounter.scalaAdaptiveLoadBalancingRouterLiketrait 中查看 的Route创建时间:

{
  case (sender, message) ⇒
    message match {
      case Broadcast(msg) ⇒ toAll(sender, routeeProvider.routees)
      case msg            ⇒ List(Destination(sender, getNext()))
    }
}
4

1 回答 1

2

在您的第一个示例中,您使用的路由器只会发送到一个路由。从我读过的文档中,该路由器将使用来自不同节点的可用指标来选择似乎受到最少胁迫的节点,并向该节点上的路由发送消息。我认为您在此设置中看到的行为是预期的。

对于您的第二个示例,我在文档中没有看到有关BraodcastRouter在集群环境中使用 a 的任何内容,因此我不确定是否支持这种方法。话虽如此,我的猜测是BraodcastRouter使用空的 routees ( Nil) 列表创建是导致您看到的行为的原因。我想如果你改变它,BroadcastRouter(100)你可能会看到不同的行为。但是同样,我不认为(基于文档中缺少示例)BroadcastRouter支持使用 a (我可能是错的)。

您能否对您的用例进行更多解释,以便我理解您为什么需要为您的集群使用广播类型的路由器?

编辑

FWIW,我得到了使用以下代码的东西。首先,配置:

akka {
  actor {
    provider = "akka.cluster.ClusterActorRefProvider"
  }
  remote {
    transport = "akka.remote.netty.NettyRemoteTransport"
    log-remote-lifecycle-events = off
    netty {
      hostname = "127.0.0.1"
      port = 0
    }
  }

  cluster {
    min-nr-of-members = 2
    seed-nodes = [
      "akka://ClusterSystem@127.0.0.1:2551", 
      "akka://ClusterSystem@127.0.0.1:2552"]

    auto-down = on
  }
}

然后,我使用以下代码启动了两个节点(一个在 2551 上,另一个在 2552 上):

object ClusterNode {

  def main(args: Array[String]): Unit = {

    // Override the configuration of the port 
    // when specified as program argument
    if (args.nonEmpty) System.setProperty("akka.remote.netty.port", args(0))


    // Create an Akka system
    val system = ActorSystem("ClusterSystem")
    val clusterListener = system.actorOf(Props(new Actor with ActorLogging {
      def receive = {
        case state: CurrentClusterState =>
          log.info("Current members: {}", state.members)
        case MemberJoined(member) =>
          log.info("Member joined: {}", member)
        case MemberUp(member) =>
          log.info("Member is Up: {}", member)
        case UnreachableMember(member) =>
          log.info("Member detected as unreachable: {}", member)
        case _: ClusterDomainEvent => // ignore

      }
    }), name = "clusterListener")

    Cluster(system).subscribe(clusterListener, classOf[ClusterDomainEvent])    
  }

}

class FooActor extends Actor{

  override def preStart = {
    println("Foo actor started on path: " + context.self.path)
  }

  def receive = {
    case msg => println(context.self.path + " received message: " + msg)
  }
}

然后,我使用以下代码启动了第三个“节点”,即我的客户端节点:

object ClusterClient {
  def main(args: Array[String]) {
    val system = ActorSystem("ClusterSystem")

    Cluster(system) registerOnMemberUp{
      val router = system.actorOf(Props[FooActor].withRouter(
        ClusterRouterConfig(AdaptiveLoadBalancingRouter(HeapMetricsSelector),
        ClusterRouterSettings(
        totalInstances = 20, maxInstancesPerNode = 10,
        allowLocalRoutees = false))),
        name = "fooRouter")  

     router ! Broadcast("bar")
    }
  }
}

当消息发送时,我看到它在两个服务器节点虚拟机中都收到了,每个虚拟机有 10 个参与者。

我的路由器和你的不同之处在于我没有指定本地路由,而是换成routeesPathmaxInstancesPerNode. 我希望这有帮助。

于 2013-05-21T01:01:02.957 回答