0

我正在尝试在 Akka Actor 中执行以下 Scala 代码。

class FilteringService(implicit timeout: Timeout) extends Actor {

  def receive: PartialFunction[Any, Unit] = {

    case GetProfiles ⇒
      val requester = sender
      def getProfiles = {

        var result = new Array[Profile](0)

        println("[GET-PROFILES] Entered, making request")
        val req = Get("http://localhost:9090/profiles")
        implicit val profileFormat = jsonFormat16(Profile)


        val responseFuture: Future[HttpResponse] = Http().singleRequest(req)
        println("[GET-PROFILES] Entered, request sent")
        responseFuture.onComplete {

          case Success(response) =>
            println("[RES - SUCCESS] Request returned with " + response.status)
            val responseAsProfiles =  Unmarshal(response.entity).to[Array[Profile]]
            responseAsProfiles.onComplete {
            println("[UNMARSH - SUCCESS] Unmarshaling Done!")
            _.get match {
              case profiles: Array[Profile] =>
                println("[UNMARSH - SUCCESS] Sending Profiles message to " + sender())
                requester ! profiles
                println("[UNMARSH - SUCCESS] Message sent to " + sender())
              case _ => println("error")
              }
            }

          case Failure(_)   =>
            sys.error("something wrong")
            //return Future[Array[Profile]]

        }
      }
      println("[RECEIVE] Message GetProfiles received from " + sender().toString())
      getProfiles
      println("[RECEIVE] Message GetProfiles invoked")

  }

当 Actor 收到消息“GetProfiles”时:

1-它向远程服务器发送请求,因此操作结果是 Future[HttpResponse]

2- 如果成功,它会检索响应(一个 JSON 数组)并要求将对象解组为 Array[Profile]。(Profile 模型并不重要)。Unmarshall 方法的结果是 Future[Array[Profile]]

3-如果成功,我想将结果发送回原始发件人!

我设法做到了,但这是一个技巧,因为我将发件人保存在一个变量中,该变量在范围(请求者)中可见。我知道有管道模式,所以理论上我可以将responseAsProfiles对象发送回发送者,但是该对象是在responseFuture对象的 onComplete 方法中创建的(当然,我们必须等待它!)

就是这样!在这种情况下,如何使用管道模式将结果发送回发送者?提前致谢!!!

4

1 回答 1

1

一般的想法是您使用mapand组成期货,flatMap并尽量避免使用onComplete

看看您是否可以将代码转换为以下较小的部分,然后编写:

def getRawProfileData(): Future[HttpResponse] = {
 // ... here you make http request
}

def unmarshalProfiles(response: HttpResponse): Future[List[Profile]] = {
  // ... unmarshalling logic
}

def getProfiles(): Future[List[Profile]] = getRawProfileData().flatMape(unmarshalProfiles)

// now from receive block

case GetProfiles ⇒ getProfiles().pipeTo(sender())

于 2020-09-17T17:57:43.497 回答