3

I am trying to use cachedHostConnectionPoolHttps to make outbound https requests, but it seems that after some time for Non 2XX responses, the pool flow stops emitting out element's thus causing the entire flow to stop. The time of occurrence of this particular behaviour is quite random but happens and is reproducible.

Here is a sample test in which it stops emitting results after some time Expected OnNext(_), yet no element signaled during 10 seconds is thrown

import java.util.UUID

import akka.actor.ActorSystem
import akka.http.scaladsl.Http
import akka.http.scaladsl.model._
import akka.http.scaladsl.model.headers.RawHeader
import akka.stream._
import akka.stream.scaladsl.Keep
import akka.stream.testkit.scaladsl.{TestSink, TestSource}
import org.scalatest._

import scala.util.Random

class HttpPoolTest extends FlatSpec with Matchers with OptionValues with Inside with Inspectors {

  "HttpPoolTest" should "run without exception" in {
    implicit val httpSystem = ActorSystem("http-out")
    implicit val httpMat = ActorMaterializer()
    implicit val ec = httpSystem.dispatcher

    val helloFlow = Http().cachedHostConnectionPoolHttps[Int]("android.googleapis.com",443)(httpMat)

    val requestHeaders = scala.collection.immutable.Seq[HttpHeader](RawHeader("Authorization", "key=random" ), RawHeader("Content-Type", "application/json;charset=utf-8"))
    val httpRequest = new HttpRequest(HttpMethods.POST, "/gcm/send", requestHeaders,
      """
        |{
        |    "registration_ids": ["rnadomdata"],
        |    "delay_while_idle": true,
        |    "data": {
        |        "message": "Hello World",
        |        "title": "Interstellar",
        |        "id": "1231222i3211232228w2t1xx6wxq",
        |        "notificationType": "Text"
        |    }
        |}
      """.
        stripMargin)

    val (upstream , downstream) = TestSource.probe[(HttpRequest, Int)].via(helloFlow).toMat(TestSink.probe)(Keep.both).run()


    while (true) {
      val randWait = math.max(5000, Random.nextInt(20000)) //atleast 5 seconds to process the http request
      val randRequest = math.max(1, Random.nextInt(5))
      downstream.request(randRequest)

      (1 to randRequest).foreach(i => upstream.sendNext(httpRequest -> UUID.randomUUID().hashCode()))
      println (s"Waiting for $randWait millis to process")
      Thread.sleep(randWait)
      (1 to randRequest) foreach (i => noException should be thrownBy downstream.expectNext())

    }
  }

}

Any pointers what is going on wrong here? It works for sometime, so I suspect something is going wrong while handling the non 2xx error.

I have tried turning off akka.http.host-connection-pool.max-retries = 0 and akka.http.host-connection-pool.idle-timeout = infinite, but no results.

4

1 回答 1

0

您需要使用实体数据字节以确保 httpFlow 继续运行。如果您对解释 httpResponse 的正文不感兴趣,只需通读即可。

downstream.expectNext().entity.dataBytes.to(Sink.ignore)

于 2016-03-08T08:09:36.667 回答