我了解如何在 akka 中制作基于消息的非阻塞应用程序,并且可以轻松模拟执行并发操作的示例并将聚合结果传递回消息中。当我的应用程序必须响应 HTTP 请求时,我很难理解我的非阻塞选项是什么。目标是接收请求并立即将其交给本地或远程参与者来完成工作,而后者又会将其交给可能需要一些时间的结果。不幸的是,在这种模式下,我不明白如何用一系列非阻塞的“告诉”而不是阻塞的“询问”来表达这一点。如果在链条中的任何一点我使用一个告诉,我不再有未来可以用作最终的响应内容(http 框架接口需要它,在这种情况下是 finagle - 但这并不重要)。我知道这个请求是在它自己的线程上的,我的例子很做作,但只是想了解我的设计选项。
总而言之,如果我下面的人为示例可以重新设计以减少阻塞,我非常喜欢了解如何。这是我一年多前进行一些简单的探索后第一次使用akka,在我看过的每一篇文章、文档和谈话中都说不要阻止服务。
概念性的答案可能会有所帮助,但也可能与我已经阅读过的相同。工作/编辑我的示例可能是我理解我试图解决的确切问题的关键。如果当前示例通常是需要做的,那么确认也是有帮助的,所以我不会搜索不存在的魔法。
注意以下别名:import com.twitter.util.{Future => TwitterFuture, Await => TwitterAwait}
object Server {
val system = ActorSystem("Example-System")
implicit val timeout = Timeout(1 seconds)
implicit def scalaFuture2twitterFuture[T](scFuture: Future[T]): TwitterFuture[T] = {
val promise = TwitterPromise[T]
scFuture onComplete {
case Success(result) ⇒ promise.setValue(result)
case Failure(failure) ⇒ promise.setException(failure)
}
promise
}
val service = new Service[HttpRequest, HttpResponse] {
def apply(req: HttpRequest): TwitterFuture[HttpResponse] = req.getUri match {
case "/a/b/c" =>
val w1 = system.actorOf(Props(new Worker1))
val r = w1 ? "take work"
val response: Future[HttpResponse] = r.mapTo[String].map { c =>
val resp = new DefaultHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.OK)
resp.setContent(ChannelBuffers.copiedBuffer(c, CharsetUtil.UTF_8))
resp
}
response
}
}
//val server = Http.serve(":8080", service); TwitterAwait.ready(server)
class Worker1 extends Actor with ActorLogging {
def receive = {
case "take work" =>
val w2 = context.actorOf(Props(new Worker2))
pipe (w2 ? "do work") to sender
}
}
class Worker2 extends Actor with ActorLogging {
def receive = {
case "do work" =>
//Long operation...
sender ! "The Work"
}
}
def main(args: Array[String]) {
val r = service.apply(
com.twitter.finagle.http.Request("/a/b/c")
)
println(TwitterAwait.result(r).getContent.toString(CharsetUtil.UTF_8)) // prints The Work
}
}
提前感谢您提供的任何指导!