最近我开始使用 akka 流构建一些小型 Web 处理服务。这很简单,我从 redis 中提取 url,然后我下载这些 url(它们是图像),稍后我正在处理图像,并将它们推送到 s3 和一些 json 到 redis。
我正在从多个站点下载许多不同类型的图像,我收到了一大堆错误,例如 404、意外断开连接、响应内容长度 17951202 超过了配置的 8388608 限制、EntityStreamException:实体流截断和重定向。通过重定向,我正在调用 requestWithRedirects ,其地址位于响应的位置标头中。
负责下载的部分差不多是这样的:
override lazy val http: HttpExt = Http()
def requestWithRedirects(request: HttpRequest, retries: Int = 10)(implicit akkaSystem: ActorSystem, materializer: FlowMaterializer): Future[HttpResponse] = {
TimeoutFuture(timeout, msg = "Download timed out!") {
http.singleRequest(request)
}.flatMap {
response => handleResponse(request, response, retries)
}.recoverWith {
case e: Exception if retries > 0 =>
requestWithRedirects(request, retries = retries - 1)
}
}
TimeoutFuture 非常简单,它需要未来和超时。如果未来的时间超过超时,它会返回其他未来的超时异常。我遇到的问题是:一段时间后我收到一个错误:
Message: RuntimeException: Exceeded configured max-open-requests value of [128] akka.http.impl.engine.client.PoolInterfaceActor$$anonfun$receive$1.applyOrElse in PoolInterfaceActor.scala::109
akka.actor.Actor$class.aroundReceive in Actor.scala::467
akka.http.impl.engine.client.PoolInterfaceActor.akka$stream$actor$ActorSubscriber$$super$aroundReceive in PoolInterfaceActor.scala::46
akka.stream.actor.ActorSubscriber$class.aroundReceive in ActorSubscriber.scala::208
akka.http.impl.engine.client.PoolInterfaceActor.akka$stream$actor$ActorPublisher$$super$aroundReceive in PoolInterfaceActor.scala::46
akka.stream.actor.ActorPublisher$class.aroundReceive in ActorPublisher.scala::317
akka.http.impl.engine.client.PoolInterfaceActor.aroundReceive in PoolInterfaceActor.scala::46
akka.actor.ActorCell.receiveMessage in ActorCell.scala::516
akka.actor.ActorCell.invoke in ActorCell.scala::487
akka.dispatch.Mailbox.processMailbox in Mailbox.scala::238
akka.dispatch.Mailbox.run in Mailbox.scala::220
akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec in AbstractDispatcher.scala::397
scala.concurrent.forkjoin.ForkJoinTask.doExec in ForkJoinTask.java::260
scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask in ForkJoinPool.java::1339
scala.concurrent.forkjoin.ForkJoinPool.runWorker in ForkJoinPool.java::1979
scala.concurrent.forkjoin.ForkJoinWorkerThread.run in ForkJoinWorkerThread.java::107
我不确定可能是什么问题,但我认为我有一些下载没有正确完成,一段时间后它们会留在一些全局连接池中,导致提到的错误。任何想法可能导致问题?或者如何尝试找到问题的根源:我已经测试了 404 响应,并且响应内容长度超出了...错误,它们似乎不是我的麻烦制造者。
编辑:很可能问题出在我的 TimeoutFuture 上。我用这里描述的错误填充它https://stackoverflow.com/a/29330010/2963977但在我看来,实际上下载图像的未来永远不会完成,它正在占用我的连接池资源。
我想知道为什么这些设置对我的情况没有任何影响:
akka.http.client.connecting-timeout = 1 s
akka.http.client.idle-timeout = 1 s
akka.http.host-connection-pool.idle-timeout = 1 s
编辑2:
显然还不支持超时。这是我的错误报告 https://github.com/akka/akka/issues/17732#issuecomment-112315953