1

设置

使用 Akka ( 2.3.9 ) 远程处理。

我有一个 io.Tcp akka 系统,它与另一个远程后端 akka 系统通信。

问题是远程发回的消息永远不会到达 tcp 系统。

Front ( Tcp ) 系统在收到连接时注册一个“follower”类。

Follower 识别(成功)远程后端系统,然后转发接收到的数据

来自追随者的片段:

def receive = {
    case ActorIdentity(`identifyId`, Some(ref)) =>
        log.error( "Identified successfully" )
        context.watch(ref)
        context.become(active(ref))
    case ActorIdentity(`identifyId`, None) =>
        log.error( "Failed to identify remote" )
        context.stop(self)
}

def active(another: ActorRef): Actor.Receive = {
    case Terminated(`another`) => context.stop(self)
    case data: Received =>
        log.debug( "Forwarding message" )
        another forward data
    case other : Any =>
        log.warning( "Unexpected case :" + other.getClass )
}

远程系统侦听 Received 案例,然后将回复写入发件人。

 override def receive =  ( {
    case Received( data ) =>
        sender() ! Write( processData( data ) )
    case _ => log.debug( "Unexpected receive" )

} : PartialFunction[Any, Unit] ) orElse super.receive

问题

一切正常,远程 akka 系统正确读取接收到的数据,对其进行处理,但在写回结果时会自行处理。

EndpointWriter - AssociationError [akka.tcp://front-tcp@127.0.0.1:2553] -> [akka.tcp://backend@127.0.0.1:2552]: Error [akka.io.Tcp$Write; unable to create instance] 

任何想法出了什么问题?

更新:

我现在从遥控器传回一个确认(在后端系统中)

case object Ack extends Event

 override def receive =  ( {
    case Received( data ) =>
        sender() ! Write( processData( data ), Ack )

现在我被告知远程 Actor(此日志中的 MyHandler )不可序列化:

java.io.NotSerializableException: handler.MyHandler
at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1183) ~[na:1.7.0_65]
at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1547) ~[na:1.7.0_65]
at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1508) ~[na:1.7.0_65]
at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1431) ~[na:1.7.0_65]
at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1177) ~[na:1.7.0_65]

MyHandler 不应该是自动序列化的,因为它是一个演员吗?

更新 2

我现在只是等待向遥控器“询问”的结果:

def active(another: ActorRef): Actor.Receive = {
    case Terminated(`another`) => context.stop(self)
    case Received( message ) =>
        val futRemoteAnswer = ask( another, message ).mapTo[ Message ]
        val result = Await.result( futRemoteAnswer, timeout.duration )
        sender() ! Write( result )
    case other : Any =>
        log.warning( "Unexpected case :" + other.getClass )
}
4

0 回答 0