我正在使用 Akka Remote Actor 创建一个示例,RemoteLookupProxyForwarder
其定义在 Akka In Action 书中。我的要求就像远程创建一个演员,但使用代码配置。RemoteLookupProxyForwarder
就像查找远程参与者系统一样工作,如果参与者系统可用,则创建一个远程参与者,否则等待。
在他的帮助下application.conf
,这已经成功地完成了,并且作为方面的行为工作得很好。
但是使用代码的问题是,如果远程参与者不可用,则代理参与者无法查找参与者,并且当消息发送到远程参与者时,所有消息都会发送到死信。
远程代理演员代码:
class RemoteLookupProxyForwarder extends Actor with ActorLogging {
context.setReceiveTimeout(3 seconds)
deployAndWatch
def deployAndWatch: Unit = {
val actor = context.actorOf(Props[RemoteActorR1], "echo")
context.watch(actor)
log.info("switching to may be active state")
context.become(maybeActive(actor))
context.setReceiveTimeout(Duration.Undefined)
}
def deploying: Receive = {
case ReceiveTimeout =>
deployAndWatch
case msg => log.error(s"Ignoring message $msg, remote actor is not ready yet")
}
def maybeActive(actor: ActorRef): Receive = {
case Terminated(actor) =>
log.info(s"Actor $actor terminated.")
log.info("switching to deploying state")
context.become(deploying)
context.setReceiveTimeout(3 seconds)
deployAndWatch
case msg => actor forward msg
}
override def receive = deploying
}
object RemoteLookupProxyForwarder {
def props = Props(new RemoteLookupProxyForwarder)
def name = "forwarder"
}
远程 1 演员系统:
class RemoteActorR1 extends Actor with ActorLogging {
override def receive: Receive = {
case msg => log.info(s"Server Received $msg")
}
}
object RemoteActorR1 {
def main(args: Array[String]): Unit = {
val config = ConfigFactory.parseString(conf)
ActorSystem("remote-r1", config)
}
val conf =
"""
|akka {
| log-dead-letters = "OFF"
|
| actor {
| provider = "akka.remote.RemoteActorRefProvider"
| }
|
| remote {
| enabled-transports = ["akka.remote.netty.tcp"]
| netty.tcp {
| hostname = "0.0.0.0"
| port = 2551
| }
| }
|}
""".stripMargin
}
远程 2 演员系统:
object RemoteActorR3 extends App {
val uri = "akka.tcp://remote-r1@0.0.0.0:2551"
val remoteR1Address = AddressFromURIString(uri)
val props = Props[RemoteLookupProxyForwarder].withDeploy(
Deploy(scope = RemoteScope(remoteR1Address))
)
val conf =
"""
|akka {
| log-dead-letters = "OFF"
|
| actor {
| provider = "akka.remote.RemoteActorRefProvider"
| }
|
| remote {
| enabled-transports = ["akka.remote.netty.tcp"]
| netty.tcp {
| hostname = "0.0.0.0"
| port = 2553
| }
| }
|}
""".stripMargin
val config = ConfigFactory.parseString(conf)
val ref = ActorSystem("remote-r3", config)
val remoteR1 = ref.actorOf(props, RemoteLookupProxyForwarder.name)
Thread.sleep(30000)
println(">>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>")
remoteR1 ! "Hello Dude1"
remoteR1 ! "Hello Dude2"
remoteR1 ! "Hello Dude3"
}
使用application.conf
config这个例子运行良好,因为在config中,我们描述了两个部署,但是是代码的情况,仍然无法找到如何定义多个部署。
application.conf
配置多个部署:
val conf =
"""
|akka {
| log-dead-letters = "OFF"
|
| actor {
| provider = "akka.remote.RemoteActorRefProvider"
|
| deployment {
| /echo {
| remote = "akka.tcp://remote-r1@0.0.0.0:2551"
| }
|
| /forwarder/echo {
| remote = "akka.tcp://remote-r1@0.0.0.0:2551"
| }
| }
| }
|
| remote {
| enabled-transports = ["akka.remote.netty.tcp"]
| netty.tcp {
| hostname = "0.0.0.0"
| port = 2552
| }
| }
|}
""".stripMargin
我的假设是,在代码中,我只定义了一个部署,这就是示例无法按预期工作的方式。那么,我们如何定义多个部署呢?