设置
使用 Akka ( 2.3.9 ) 远程处理。
我有一个 io.Tcp akka 系统,它与另一个远程后端 akka 系统通信。
问题是远程发回的消息永远不会到达 tcp 系统。
Front ( Tcp ) 系统在收到连接时注册一个“follower”类。
Follower 识别(成功)远程后端系统,然后转发接收到的数据
来自追随者的片段:
def receive = {
case ActorIdentity(`identifyId`, Some(ref)) =>
log.error( "Identified successfully" )
context.watch(ref)
context.become(active(ref))
case ActorIdentity(`identifyId`, None) =>
log.error( "Failed to identify remote" )
context.stop(self)
}
def active(another: ActorRef): Actor.Receive = {
case Terminated(`another`) => context.stop(self)
case data: Received =>
log.debug( "Forwarding message" )
another forward data
case other : Any =>
log.warning( "Unexpected case :" + other.getClass )
}
远程系统侦听 Received 案例,然后将回复写入发件人。
override def receive = ( {
case Received( data ) =>
sender() ! Write( processData( data ) )
case _ => log.debug( "Unexpected receive" )
} : PartialFunction[Any, Unit] ) orElse super.receive
问题
一切正常,远程 akka 系统正确读取接收到的数据,对其进行处理,但在写回结果时会自行处理。
EndpointWriter - AssociationError [akka.tcp://front-tcp@127.0.0.1:2553] -> [akka.tcp://backend@127.0.0.1:2552]: Error [akka.io.Tcp$Write; unable to create instance]
任何想法出了什么问题?
更新:
我现在从遥控器传回一个确认(在后端系统中)
case object Ack extends Event
override def receive = ( {
case Received( data ) =>
sender() ! Write( processData( data ), Ack )
现在我被告知远程 Actor(此日志中的 MyHandler )不可序列化:
java.io.NotSerializableException: handler.MyHandler
at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1183) ~[na:1.7.0_65]
at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1547) ~[na:1.7.0_65]
at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1508) ~[na:1.7.0_65]
at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1431) ~[na:1.7.0_65]
at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1177) ~[na:1.7.0_65]
MyHandler 不应该是自动序列化的,因为它是一个演员吗?
更新 2
我现在只是等待向遥控器“询问”的结果:
def active(another: ActorRef): Actor.Receive = {
case Terminated(`another`) => context.stop(self)
case Received( message ) =>
val futRemoteAnswer = ask( another, message ).mapTo[ Message ]
val result = Await.result( futRemoteAnswer, timeout.duration )
sender() ! Write( result )
case other : Any =>
log.warning( "Unexpected case :" + other.getClass )
}