无法使用 Akka HTTP 10.1.0-RC2 和 Akka Streams 2.5.11 重现。以下作品:
val requests = List((HttpRequest(uri = "http://akka.io"), Promise[HttpResponse]()),
(HttpRequest(uri = "http://www.yahoo.com"), Promise[HttpResponse]()))
val poolClientFlow =
Http().superPool[Promise[HttpResponse]](settings = ConnectionPoolSettings(system).withMaxConnections(10)
Source(requests)
.map { req => println("-", req); req }
.via(poolClientFlow)
.map { resp => println("|", resp); resp }
.toMat(Sink.foreach(println))(Keep.both)
.run()
// The following is printed:
// (-,(HttpRequest(HttpMethod(GET),http://akka.io,List(),HttpEntity.Strict(none/none,ByteString()),HttpProtocol(HTTP/1.1)),Future()))
// (-,(HttpRequest(HttpMethod(GET),http://www.yahoo.com,List(),HttpEntity.Strict(none/none,ByteString()),HttpProtocol(HTTP/1.1)),Future()))
// (|,(Success(HttpResponse(301 Moved Permanently,List(Date: Thu, 08 Mar 2018 14:33:46 GMT, Connection: keep-alive, Cache-Control: max-age=3600, Expires: Thu, 08 Mar 2018 15:33:46 GMT, Location: https://akka.io/, Server: cloudflare, CF-RAY: 3f8604d386979cf6-AMS),HttpEntity.Chunked(application/octet-stream),HttpProtocol(HTTP/1.1))),Future()))
// (Success(HttpResponse(301 Moved Permanently,List(Date: Thu, 08 Mar 2018 14:33:46 GMT, Connection: keep-alive, Cache-Control: max-age=3600, Expires: Thu, 08 Mar 2018 15:33:46 GMT, Location: https://akka.io/, Server: cloudflare, CF-RAY: 3f8604d386979cf6-AMS),HttpEntity.Chunked(application/octet-stream),HttpProtocol(HTTP/1.1))),Future())
// (|,(Success(HttpResponse(301 Moved Permanently,List(Date: Thu, 08 Mar 2018 14:33:46 GMT, Connection: keep-alive, Via: http/1.1 media-router-fp6.prod.media.ir2.yahoo.com (ApacheTrafficServer [c s f ]), Server: ATS, Cache-Control: no-store, no-cache, Content-Language: en, X-Frame-Options: SAMEORIGIN, Location: https://www.yahoo.com/),HttpEntity.Strict(text/html,ByteString(114, 101, 100, 105, 114, 101, 99, 116)),HttpProtocol(HTTP/1.1))),Future()))
// (Success(HttpResponse(301 Moved Permanently,List(Date: Thu, 08 Mar 2018 14:33:46 GMT, Connection: keep-alive, Via: http/1.1 media-router-fp6.prod.media.ir2.yahoo.com (ApacheTrafficServer [c s f ]), Server: ATS, Cache-Control: no-store, no-cache, Content-Language: en, X-Frame-Options: SAMEORIGIN, Location: https://www.yahoo.com/),HttpEntity.Strict(text/html,ByteString(114, 101, 100, 105, 114, 101, 99, 116)),HttpProtocol(HTTP/1.1))),Future())
// [WARN] [03/08/2018 14:46:31.003] [scraper-akka.actor.default-dispatcher-4] [scraper/Pool(shared->http://akka.io:80)] [0 (WaitingForResponseEntitySubscription)] Response entity was not subscribed after 1 second. Make sure to read the response entity body or call `discardBytes()` on it. GET / Empty -> 301 Moved Permanently Chunked
可能更好的方法是这样的(注意对 的调用discardEntityBytes()
):
Source(requests)
.map { req => println("-", req); req }
.via(poolClientFlow)
.map { resp => println("|", resp); resp }
.toMat(Sink.foreach({
case ((util.Success(resp), p)) =>
resp.discardEntityBytes()
p.success(resp)
case ((util.Failure(e), p)) => p.failure(e)
}))(Keep.both)
.run()
// The following is printed:
// (-,(HttpRequest(HttpMethod(GET),http://akka.io,List(),HttpEntity.Strict(none/none,ByteString()),HttpProtocol(HTTP/1.1)),Future()))
// (-,(HttpRequest(HttpMethod(GET),http://www.yahoo.com,List(),HttpEntity.Strict(none/none,ByteString()),HttpProtocol(HTTP/1.1)),Future()))
// (|,(Success(HttpResponse(301 Moved Permanently,List(Date: Thu, 08 Mar 2018 14:38:32 GMT, Connection: keep-alive, Via: http/1.1 media-router-fp21.prod.media.ir2.yahoo.com (ApacheTrafficServer [c s f ]), Server: ATS, Cache-Control: no-store, no-cache, Content-Language: en, X-Frame-Options: SAMEORIGIN, Location: https://www.yahoo.com/),HttpEntity.Strict(text/html,ByteString(114, 101, 100, 105, 114, 101, 99, 116)),HttpProtocol(HTTP/1.1))),Future()))
// (|,(Success(HttpResponse(301 Moved Permanently,List(Date: Thu, 08 Mar 2018 14:38:32 GMT, Connection: keep-alive, Cache-Control: max-age=3600, Expires: Thu, 08 Mar 2018 15:38:32 GMT, Location: https://akka.io/, Server: cloudflare, CF-RAY: 3f860bca84a32ba6-AMS),HttpEntity.Chunked(application/octet-stream),HttpProtocol(HTTP/1.1))),Future()))