3

使用 Akka-IO TCP,在 actor 中建立连接的过程如下:

class MyActor(remote: InetSocketAddress) extends Actor {

  IO(Tcp) ! Connect(remote)    //this is the first step, remote is the address to connect to

  def receive = {
    case CommandFailed(_: Connect) => context stop self // failed to connect

    case Connected(remote, local) =>
      val connection = sender()
      connection ! Register(self)
      // do cool things...  
  }
}

您发送一条Connect消息IO(Tcp)并期望收到一条CommandFailed或一条Connected消息。

现在,我的目标是创建一个包装 TCP 连接的参与者,但我希望我的参与者仅在建立连接后才开始接受消息 - 否则,在等待Connected消息时它将开始接受查询但没有人可以发送他们到。

我尝试了什么:

class MyActor(address: InetSocketAddress) extends Actor {

  def receive = {
    case Initialize =>
      IO(Tcp) ! Connect(address)
      context.become(waitForConnection(sender()))

    case other => sender ! Status.Failure(new Exception(s"Connection to $address not established yet."))
  }

  private def waitForConnection(initializer: ActorRef): Receive = {
    case Connected(_, _) =>
      val connection = sender()
      connection ! Register(self)
      initializer ! Status.Success(Unit)
      // do cool things

    case CommandFailed(_: Connect) =>
      initializer ! Status.Failure(new Exception("Failed to connect to " + host))
      context stop self
  }
}

我的第一个receive是期待一条Initialize会触发整个连接过程的虚构消息,一旦完成,发送者就会Initialize收到一条成功消息并且知道它可以知道开始发送查询。

我对它不是很满意,它迫使我创造我的演员

val actor = system.actorOf(MyActor.props(remote))
Await.ready(actor ? Initialize, timeout)

而且它不会非常“重启”友好。

有什么想法可以保证我的演员在 Tcp 层回复之前不会开始从邮箱接收消息Connected吗?

4

2 回答 2

5

使用Stash特性来存储您现在无法处理的消息。当每个过早的消息到达时,使用stash()它来推迟它。连接打开后,使用unstashAll()将这些消息返回到邮箱进行处理。然后您可以使用become()来切换到消息处理状态。

于 2014-08-06T16:31:14.027 回答
2

为了让你的actor对重启更友好,你可以重写与actor生命周期相关的方法,例如preStartpostStopAkka 文档对 actor 的 start、stop 和 restart 钩子有很好的解释。

class MyActor(remote: InetSocketAddress) extends Actor {

  override def preStart() {
    IO(Tcp) ! Connect(remote) 
  }

  ...
}

现在您可以使用val actor = system.actorOf(MyActor.props(remote)). 它在启动时建立连接,并在重新启动时重新建立连接。

于 2014-08-07T07:12:06.420 回答