8

我了解如何在 akka 中制作基于消息的非阻塞应用程序,并且可以轻松模拟执行并发操作的示例并将聚合结果传递回消息中。当我的应用程序必须响应 HTTP 请求时,我很难理解我的非阻塞选项是什么。目标是接收请求并立即将其交给本地或远程参与者来完成工作,而后者又会将其交给可能需要一些时间的结果。不幸的是,在这种模式下,我不明白如何用一系列非阻塞的“告诉”而不是阻塞的“询问”来表达这一点。如果在链条中的任何一点我使用一个告诉,我不再有未来可以用作最终的响应内容(http 框架接口需要它,在这种情况下是 finagle - 但这并不重要)。我知道这个请求是在它自己的线程上的,我的例子很做作,但只是想了解我的设计选项。

总而言之,如果我下面的人为示例可以重新设计以减少阻塞,我非常喜欢了解如何。这是我一年多前进行一些简单的探索后第一次使用akka,在我看过的每一篇文章、文档和谈话中都说不要阻止服务。

概念性的答案可能会有所帮助,但也可能与我已经阅读过的相同。工作/编辑我的示例可能是我理解我试图解决的确切问题的关键。如果当前示例通常是需要做的,那么确认也是有帮助的,所以我不会搜索不存在的魔法。

注意以下别名:import com.twitter.util.{Future => TwitterFuture, Await => TwitterAwait}

object Server {

  val system = ActorSystem("Example-System")

  implicit val timeout = Timeout(1 seconds)

  implicit def scalaFuture2twitterFuture[T](scFuture: Future[T]): TwitterFuture[T] = {
    val promise = TwitterPromise[T]
    scFuture onComplete {
      case Success(result)  ⇒ promise.setValue(result)
      case Failure(failure) ⇒ promise.setException(failure)
    }
    promise
  }

  val service = new Service[HttpRequest, HttpResponse] {
    def apply(req: HttpRequest): TwitterFuture[HttpResponse] = req.getUri match {
      case "/a/b/c" =>
        val w1 = system.actorOf(Props(new Worker1))

        val r = w1 ? "take work"

        val response: Future[HttpResponse] = r.mapTo[String].map { c =>
          val resp = new DefaultHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.OK)
          resp.setContent(ChannelBuffers.copiedBuffer(c, CharsetUtil.UTF_8))
          resp
        }

        response
    }
  }

//val server = Http.serve(":8080", service); TwitterAwait.ready(server)

  class Worker1 extends Actor with ActorLogging {
    def receive = {
      case "take work" =>
        val w2 = context.actorOf(Props(new Worker2))
        pipe (w2 ? "do work") to sender
    }
  }

  class Worker2 extends Actor with ActorLogging {
    def receive = {
      case "do work" =>
        //Long operation...
        sender ! "The Work"
    }
  }

  def main(args: Array[String]) {
    val r = service.apply(
      com.twitter.finagle.http.Request("/a/b/c")
    )
    println(TwitterAwait.result(r).getContent.toString(CharsetUtil.UTF_8)) // prints The Work
  }
}

提前感谢您提供的任何指导!

4

2 回答 2

5

您可以通过使用管道模式避免将未来作为消息发送- 即,Worker1您可以这样写:

pipe(w2 ? "do work") to sender

代替:

sender ! (w2 ? "do work")

现在r将是 aFuture[String]而不是 a Future[Future[String]]


更新:pipe上面的解决方案是避免让你的演员回应未来的一般方法。正如 Viktor 在下面的评论中指出的那样,在这种情况下,您可以Worker1完全退出循环,Worker2直接告诉参与者它 ( Worker1) 从以下位置获取消息:

w2.tell("do work", sender)

Worker1如果负责以某种方式对响应进行操作Worker2(通过使用mapon w2 ? "do work",将多个期货与flatMapfor-comprehension 组合等),这将不是一个选项,但如果这不是必需的,则此版本更清洁、更高效.


那会杀了一个人Await.result。您可以通过编写如下内容来摆脱另一个:

val response: Future[HttpResponse] = r.mapTo[String].map { c =>
  val resp = new DefaultHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.OK)
  resp.setContent(ChannelBuffers.copiedBuffer(c, CharsetUtil.UTF_8))
  resp
}

现在你只需要把它Future变成一个TwitterFuture. 我不能完全告诉你如何做到这一点,但它应该是相当微不足道的,而且绝对不需要阻塞。

于 2013-08-21T18:58:51.390 回答
0

你绝对不必在这里阻塞。首先,将 twitter 内容的导入更新为:

 import com.twitter.util.{Future => TwitterFuture, Await => TwitterAwait, Promise => TwitterPromise}

您将需要 twitter ,因为这是您将从该方法返回Promise的 impl 。然后,按照特拉维斯布朗在他的回答中所说的话,这样你的演员就会以一种你没有嵌套未来的方式做出回应。完成此操作后,您应该能够将方法更改为以下内容:Futureapplyapply

def apply(req: HttpRequest): TwitterFuture[HttpResponse] = req.getUri match {
  case "/a/b/c" =>
    val w1 = system.actorOf(Props(new Worker1))

    val r = (w1 ? "take work").mapTo[String]
    val prom = new TwitterPromise[HttpResponse]
    r.map(toResponse) onComplete{
      case Success(resp) => prom.setValue(resp)
      case Failure(ex) => prom.setException(ex)            
    }

    prom
}

def toResponse(c:String):HttpResponse = {
  val resp = new DefaultHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.OK)
  resp.setContent(ChannelBuffers.copiedBuffer(c, CharsetUtil.UTF_8))
  resp
}

这可能需要更多的工作。我没有在我的 IDE 中设置它,所以我不能保证它可以编译,但我相信这个想法是合理的。您从该apply方法返回的TwitterFuture是尚未完成的。当演员询问(?)的未来完成并且通过非阻塞onComplete回调发生时,它将完成。

于 2013-08-21T20:22:51.543 回答