1

我正在使用 Akka Remote Actor 创建一个示例,RemoteLookupProxyForwarder其定义在 Akka In Action 书中。我的要求就像远程创建一个演员,但使用代码配置。RemoteLookupProxyForwarder就像查找远程参与者系统一样工作,如果参与者系统可用,则创建一个远程参与者,否则等待。

在他的帮助下application.conf,这已经成功地完成了,并且作为方面的行为工作得很好。

但是使用代码的问题是,如果远程参与者不可用,则代理参与者无法查找参与者,并且当消息发送到远程参与者时,所有消息都会发送到死信。

远程代理演员代码:

class RemoteLookupProxyForwarder extends Actor with ActorLogging {

  context.setReceiveTimeout(3 seconds)
  deployAndWatch

  def deployAndWatch: Unit = {
    val actor = context.actorOf(Props[RemoteActorR1], "echo")
    context.watch(actor)
    log.info("switching to may be active state")
    context.become(maybeActive(actor))
    context.setReceiveTimeout(Duration.Undefined)
  }


  def deploying: Receive = {
    case ReceiveTimeout =>
      deployAndWatch

    case msg => log.error(s"Ignoring message $msg, remote actor is not ready yet")
  }

  def maybeActive(actor: ActorRef): Receive = {
    case Terminated(actor) =>
      log.info(s"Actor $actor terminated.")
      log.info("switching to deploying state")
      context.become(deploying)
      context.setReceiveTimeout(3 seconds)
      deployAndWatch

    case msg => actor forward msg
  }

  override def receive = deploying
}

object RemoteLookupProxyForwarder {
  def props = Props(new RemoteLookupProxyForwarder)

  def name = "forwarder"
}

远程 1 演员系统:

class RemoteActorR1 extends Actor with ActorLogging {

  override def receive: Receive = {
    case msg => log.info(s"Server Received $msg")
  }
}

object RemoteActorR1 {

  def main(args: Array[String]): Unit = {
    val config = ConfigFactory.parseString(conf)
    ActorSystem("remote-r1", config)
  }

  val conf =
    """
      |akka {
      |  log-dead-letters = "OFF"
      |
      |  actor {
      |    provider = "akka.remote.RemoteActorRefProvider"
      |  }
      |
      |  remote {
      |    enabled-transports = ["akka.remote.netty.tcp"]
      |    netty.tcp {
      |      hostname = "0.0.0.0"
      |      port = 2551
      |    }
      |  }
      |}
    """.stripMargin
}

远程 2 演员系统:

object RemoteActorR3 extends App {

  val uri = "akka.tcp://remote-r1@0.0.0.0:2551"
  val remoteR1Address = AddressFromURIString(uri)

  val props = Props[RemoteLookupProxyForwarder].withDeploy(
    Deploy(scope = RemoteScope(remoteR1Address))
  )

  val conf =
    """
      |akka {
      |  log-dead-letters = "OFF"
      |
      |  actor {
      |    provider = "akka.remote.RemoteActorRefProvider"
      |  }
      |
      |  remote {
      |    enabled-transports = ["akka.remote.netty.tcp"]
      |    netty.tcp {
      |      hostname = "0.0.0.0"
      |      port = 2553
      |    }
      |  }
      |}
    """.stripMargin

  val config = ConfigFactory.parseString(conf)
  val ref = ActorSystem("remote-r3", config)
  val remoteR1 = ref.actorOf(props, RemoteLookupProxyForwarder.name)

  Thread.sleep(30000)
  println(">>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>")
  remoteR1 ! "Hello Dude1"
  remoteR1 ! "Hello Dude2"
  remoteR1 ! "Hello Dude3"
}

使用application.confconfig这个例子运行良好,因为在config中,我们描述了两个部署,但是是代码的情况,仍然无法找到如何定义多个部署。

application.conf配置多个部署:

val conf =
    """
      |akka {
      |  log-dead-letters = "OFF"
      |
      |  actor {
      |    provider = "akka.remote.RemoteActorRefProvider"
      |
      |    deployment {
      |     /echo {
      |       remote = "akka.tcp://remote-r1@0.0.0.0:2551"
      |      }
      |
      |     /forwarder/echo {
      |       remote = "akka.tcp://remote-r1@0.0.0.0:2551"
      |      }
      |    }
      |  }
      |
      |  remote {
      |    enabled-transports = ["akka.remote.netty.tcp"]
      |    netty.tcp {
      |      hostname = "0.0.0.0"
      |      port = 2552
      |    }
      |  }
      |}
    """.stripMargin

我的假设是,在代码中,我只定义了一个部署,这就是示例无法按预期工作的方式。那么,我们如何定义多个部署呢?

4

0 回答 0