16

我需要使用 Akka 的 HTTP 客户端(v2.0.2)来使用 REST 服务。合乎逻辑的方法是通过主机连接池来执行此操作,因为我们预计会有大量的同时连接。Flowfor this 消耗 a(HttpRequest, T)并返回(Try[HttpResponse, T)a 。该文档表明需要一些任意类型T来管理对请求的潜在无序响应,但没有指出调用者应该如何处理返回的T.

我的第一次尝试是下面使用Intas的函数T。从许多地方调用它以确保连接使用单个池。

val pool = Http().cachedHostConnectionPool[Int]("127.0.0.1", 8888, ConnectionPoolSettings(system))

def pooledRequest(req: HttpRequest): Future[HttpResponse] = {
  val unique = Random.nextInt
  Source.single(req → unique).via(pool).runWith(Sink.head).flatMap {
    case (Success(r: HttpResponse), `unique`) ⇒ Future.successful(r)
    case (Failure(f), `unique`) ⇒ Future.failed(f)
    case (_, i) ⇒ Future.failed(new Exception("Return does not match the request"))
  }
}

问题是客户应该如何使用它T?有没有更清洁更有效的解决方案?最后,我对某些事情可能出现故障的妄想症实际上不是妄想症吗?

4

3 回答 3

25

最初我自己对此有点困惑,直到我通读了几次文档。如果您打算在池中使用单个请求,那么无论有多少不同的地方共享同一个池,T您提供的(Int在您的情况下)并不重要。因此,如果您一直在使用,那么如果您真的想要Source.single,那把钥匙总是可以的。1

但是,它确实发挥作用的地方是,如果一段代码要使用池并将多个请求一次提交到池中,并希望所有这些请求的响应。原因是响应按照从被调用的服务接收到的顺序返回,而不是按照它们提供给池的顺序返回。每个请求可能需要不同的时间,因此它们Sink按照从池中接收回来的顺序向下游流动。

假设我们有一个服务可以接受GET带有以下形式的 url 的请求:

/product/123

其中123part 是您要查找的产品的 id。如果我想一次查找1-10所有产品,每个产品都有单独的请求,这就是标识符变得很重要的地方,这样我就可以将每个HttpResponse产品与它所针对的产品 ID 关联起来。此场景的简化代码示例如下:

val requests = for(id <- 1 until 10) yield (HttpRequest(HttpMethods.GET, s"/product/$id"), id)
val responsesMapFut:Future[Map[Int,HttpResponse]] = 
  Source(requests).
    via(pool).
    runFold(Map.empty[Int,HttpResponse]){
      case (m, (util.Success(resp), id)) => 
        m ++ Map(id -> resp)

      case (m, (util.Failure(ex), i)) =>
        //Log a failure here probably
          m
    }

当我在 中得到我的回复时fold,我也很方便地拥有每个相关联的 id,因此我可以将它们添加到Map由 id 键入的我的。如果没有这个功能,我可能不得不做一些事情,比如解析主体(如果它是 json)来尝试找出哪个响应是哪个响应是不理想的,并且不包括失败的情况。在这个解决方案中,我知道哪些请求失败了,因为我仍然得到了标识符。

我希望这能为你澄清一些事情。

于 2016-01-19T13:13:15.053 回答
7

在消耗基于 HTTP 的资源时,Akka HTTP 连接池是强大的盟友。如果您要一次执行单个请求,那么解决方案是:

def exec(req: HttpRequest): Future[HttpResponse] = {
  Source.single(req → 1)
    .via(pool)
    .runWith(Sink.head).flatMap {
      case (Success(r: HttpResponse), _) ⇒ Future.successful(r)
      case (Failure(f), _) ⇒ Future.failed(f)
    }
}

因为您正在执行single请求,所以无需消除响应的歧义。但是,Akka 流很聪明。您可以同时向池提交多个请求。在这种情况下,我们传入一个Iterable[HttpRequest]. Iterable[HttpResponse]使用 a将返回的重新排序SortedMap到与原始请求相同的顺序。你可以做一个request zip response排列的东西:

def exec(requests: Iterable[HttpRequest]): Future[Iterable[Future[HttpResponse]]] = {
  Source(requests.zipWithIndex.toMap)
    .via(pool)
    .runFold(SortedMap[Int, Future[HttpResponse]]()) {
      case (m, (Success(r), idx)) ⇒ m + (idx → Future.successful(r))
      case (m, (Failure(e), idx)) ⇒ m + (idx → Future.failed(e))
    }.map(r ⇒ r.values)
}

如果您需要按照自己的方式解包,可迭代期货的期货非常棒。只需将事物展平即可获得更简单的响应。

def execFlatten(requests: Iterable[HttpRequest]): Future[Iterable[HttpResponse]] = {
  Source(requests.zipWithIndex.toMap)
    .via(pool)
    .runFold(SortedMap[Int, Future[HttpResponse]]()) {
      case (m, (Success(r), idx)) ⇒ m + (idx → Future.successful(r))
      case (m, (Failure(e), idx)) ⇒ m + (idx → Future.failed(e))
    }.flatMap(r ⇒ Future.sequence(r.values))
}

我已经用所有的导入和包装器制作了这个要点,以制作一个使用 HTTP 服务的客户端。

特别感谢 @cmbaxter 提供的简洁示例。

于 2016-01-22T11:55:17.723 回答
0

有一张用于改进 akka-http 文档的公开票。请 检查这个例子

val pool = Http().cachedHostConnectionPool[Promise[HttpResponse]](host = "google.com", port = 80)
val queue = Source.queue[(HttpRequest, Promise[HttpResponse])](10, OverflowStrategy.dropNew)
  .via(pool)
  .toMat(Sink.foreach({
     case ((Success(resp), p)) => p.success(resp)
    case ((Failure(e), p)) => p.failure(e)
  }))(Keep.left)
  .run


val promise = Promise[HttpResponse]
val request = HttpRequest(uri = "/") -> promise

val response = queue.offer(request).flatMap(buffered => {
  if (buffered) promise.future
  else Future.failed(new RuntimeException())
})
于 2017-02-02T22:00:15.523 回答