我需要使用 Akka 的 HTTP 客户端(v2.0.2)来使用 REST 服务。合乎逻辑的方法是通过主机连接池来执行此操作,因为我们预计会有大量的同时连接。Flow
for this 消耗 a(HttpRequest, T)
并返回(Try[HttpResponse, T)
a 。该文档表明需要一些任意类型T
来管理对请求的潜在无序响应,但没有指出调用者应该如何处理返回的T
.
我的第一次尝试是下面使用Int
as的函数T
。从许多地方调用它以确保连接使用单个池。
val pool = Http().cachedHostConnectionPool[Int]("127.0.0.1", 8888, ConnectionPoolSettings(system))
def pooledRequest(req: HttpRequest): Future[HttpResponse] = {
val unique = Random.nextInt
Source.single(req → unique).via(pool).runWith(Sink.head).flatMap {
case (Success(r: HttpResponse), `unique`) ⇒ Future.successful(r)
case (Failure(f), `unique`) ⇒ Future.failed(f)
case (_, i) ⇒ Future.failed(new Exception("Return does not match the request"))
}
}
问题是客户应该如何使用它T
?有没有更清洁更有效的解决方案?最后,我对某些事情可能出现故障的妄想症实际上不是妄想症吗?