I am trying to use cachedHostConnectionPoolHttps to make outbound https requests, but it seems that after some time for Non 2XX responses, the pool flow stops emitting out element's thus causing the entire flow to stop. The time of occurrence of this particular behaviour is quite random but happens and is reproducible.
Here is a sample test in which it stops emitting results after some time Expected OnNext(_), yet no element signaled during 10 seconds
is thrown
import java.util.UUID
import akka.actor.ActorSystem
import akka.http.scaladsl.Http
import akka.http.scaladsl.model._
import akka.http.scaladsl.model.headers.RawHeader
import akka.stream._
import akka.stream.scaladsl.Keep
import akka.stream.testkit.scaladsl.{TestSink, TestSource}
import org.scalatest._
import scala.util.Random
class HttpPoolTest extends FlatSpec with Matchers with OptionValues with Inside with Inspectors {
"HttpPoolTest" should "run without exception" in {
implicit val httpSystem = ActorSystem("http-out")
implicit val httpMat = ActorMaterializer()
implicit val ec = httpSystem.dispatcher
val helloFlow = Http().cachedHostConnectionPoolHttps[Int]("android.googleapis.com",443)(httpMat)
val requestHeaders = scala.collection.immutable.Seq[HttpHeader](RawHeader("Authorization", "key=random" ), RawHeader("Content-Type", "application/json;charset=utf-8"))
val httpRequest = new HttpRequest(HttpMethods.POST, "/gcm/send", requestHeaders,
"""
|{
| "registration_ids": ["rnadomdata"],
| "delay_while_idle": true,
| "data": {
| "message": "Hello World",
| "title": "Interstellar",
| "id": "1231222i3211232228w2t1xx6wxq",
| "notificationType": "Text"
| }
|}
""".
stripMargin)
val (upstream , downstream) = TestSource.probe[(HttpRequest, Int)].via(helloFlow).toMat(TestSink.probe)(Keep.both).run()
while (true) {
val randWait = math.max(5000, Random.nextInt(20000)) //atleast 5 seconds to process the http request
val randRequest = math.max(1, Random.nextInt(5))
downstream.request(randRequest)
(1 to randRequest).foreach(i => upstream.sendNext(httpRequest -> UUID.randomUUID().hashCode()))
println (s"Waiting for $randWait millis to process")
Thread.sleep(randWait)
(1 to randRequest) foreach (i => noException should be thrownBy downstream.expectNext())
}
}
}
Any pointers what is going on wrong here? It works for sometime, so I suspect something is going wrong while handling the non 2xx error.
I have tried turning off akka.http.host-connection-pool.max-retries = 0
and akka.http.host-connection-pool.idle-timeout = infinite
, but no results.