5

我使用akka有一段时间了。我开始在我的代码中看到一些模式来解决异步 io 的延迟回复。这个实现好吗?还有另一种方法可以在没有阻塞的情况下进行延迟回复?

class ApplicationApi(asyncIo : ActorRef) extends Actor {
    // store senders to late reply
    val waiting = Map[request, ActorRef]()

    def receive = {
        // an actore request for a user, store it to late reply and ask for asyncIo actor to do the real job
        case request : GetUser => 
            waiting += (sender -> request)
            asyncIo ! AsyncGet("http://app/user/" + request.userId)
        // asyncio response, parse and reply
        case response : AsyncResponse =>
            val user = parseUser(response.body)
            waiting.remove(response.request) match {
                case Some(actor) => actor ! GetUserResponse(user)
            }
    }
}
4

1 回答 1

5

在等待回复时避免阻塞的一种方法是使用返回 a 的ask方法(也称为?运算符)发送Future(与!返回不同())。

使用onSuccessorforeach方法,您可以指定如果/当未来通过回复完成时要执行的操作。要使用它,您需要混合AskSupport特征:

class ApplicationApi(asyncIo : ActorRef) extends Actor with AskSupport {

  def receive = {
    case request: GetUser =>
      val replyTo = sender
      asyncIo ? AsyncGet("http://app/user/" + request.userId) onSuccess {
        case response: AsyncResponse =>
          val user = parseUser(response.body)
          replyTo ! GetUserResponse(user)
      }

}

避免使用这种技术来执行任何修改ApplicationApiactor状态的副作用,因为该效果将与接收循环不同步。不过,将消息转发给其他参与者应该是安全的。


顺便说一句,这里有一个技巧,可以将当前捕获sender作为模式匹配的一部分,避免稍后将其分配给变量。

trait FromSupport { this: Actor =>
  case object from {
    def unapply(msg: Any) = Some(msg, sender)
  }
}

class MyActor extends Actor with FromSupport {
  def receive = {
    case (request: GetUser) from sender =>
      // sender is now a variable (shadowing the method) that is safe to use in a closure
  }
}
于 2012-08-23T16:20:00.930 回答