2

如果我有这样的 RoundRobinPool

val actorPoolRef = AkkaConfig.actorSystem.actorOf(RoundRobinPool(100).props(Props[MyService]))

和一个处理程序

def requestHandler(request: HttpRequest): Future[HttpResponse] = {
  val promise = Promise[HttpResponse]()
  promise.completeWith(actorPoolRef ? request)
  promise.future
}

有什么办法可以吗

  • def requestHandler,
  • 向刚刚处理请求的同一参与者发送后续消息
4

1 回答 1

0

您可以通过使用akka.pattern.ask向一个演员请求演员引用RoundRobinPool,将self.path作为 an返回Option并将响应包装为Option[ActorPath]。为了澄清我在说什么,我构建了这个简单的概念证明:

import akka.actor.{Actor, ActorLogging, ActorPath, ActorSystem, Props}
import akka.http.scaladsl.Http
import akka.http.scaladsl.model._
import akka.http.scaladsl.server.Directives._
import akka.http.scaladsl.server.Route
import akka.pattern.ask
import akka.routing.RoundRobinPool
import akka.util.Timeout

import scala.concurrent.Future
import scala.concurrent.duration._

object BasicRoundRobinHttpServer {
  def main(args: Array[String]): Unit = {
    run()
  }

  def run() = {
    implicit val system = ActorSystem("BasicRoundRobinHttpServer")
    import system.dispatcher
    implicit val timeout = Timeout(3 seconds)

    val myServiceActor = system.actorOf(RoundRobinPool(5).props(Props[MyService]), "myServiceActor")

    val simpleRoute: Route =
      (path("reference") & get) {
        val validResponseFuture: Option[Future[HttpResponse]] = {
          // construct the HTTP response
          val actorPathResponse: Future[Option[ActorPath]] = (myServiceActor ? "reference").mapTo[Option[ActorPath]]
          Option(actorPathResponse.map(ref => HttpResponse(
            StatusCodes.OK,
            entity = HttpEntity(
              ContentTypes.`text/html(UTF-8)`,
              s"""
                 |<html>
                 | <body>I got the actor reference: ${ref} </body>
                 |</html>
                 |""".stripMargin
            ))))
        }
        val entityFuture: Future[HttpResponse] = validResponseFuture.getOrElse(Future(HttpResponse(StatusCodes.BadRequest)))
        complete(entityFuture)
      }
    println("http GET localhost:8080/reference")
    Http().newServerAt("localhost", 8080).bind(simpleRoute)
  }
}

class MyService extends Actor with ActorLogging {
  override def receive: Receive = {
    case "reference" =>
      log.info(s"request reference at actor: ${self}")
      sender() ! Option(self.path)
    case message =>
      log.info(s"unknown message: ${message.toString}")
  }
}

从浏览器请求地址$ http GET localhost:8080/reference或使用任何 HTTP 请求者多次获得演员参考$a$b等等......

// first time
<html>
 <body>I got the actor reference: Some(akka://BasicRoundRobinHttpServer/user/myServiceActor/$a) </body>
</html>
// second time
<html>
 <body>I got the actor reference: Some(akka://BasicRoundRobinHttpServer/user/myServiceActor/$b) </body>
</html>
...
于 2021-01-12T10:22:29.443 回答