1

我想以控制发送到服务器的最大请求数的方式配置喷雾 http 客户端。我需要这个,因为如果发送了超过 2 个请求,我发送请求的服务器会阻止我。我明白了

akka.pattern.AskTimeoutException: Ask timed out on [Actor[akka://smallTasks/user/IO-HTTP#151444590]] after [15000 ms]
akka.pattern.AskTimeoutException: Ask timed out on [Actor[akka://smallTasks/user/IO-HTTP#151444590]] after [15000 ms]
akka.pattern.AskTimeoutException: Ask timed out on [Actor[akka://smallTasks/user/IO-HTTP#151444590]] after [15000 ms]
akka.pattern.AskTimeoutException: Ask timed out on 

我需要发送数千个请求,但在收到大约 100 个请求的响应后我被阻止了。

我有这个方法:

  implicit val system = ActorSystem("smallTasks")
  implicit val timeout = new Timeout(15.seconds)

  import system.dispatcher

  def doHttpRequest(url: String): Future[HttpResponse] = {
    (IO(Http) ? HttpRequest(GET, Uri(url))).mapTo[HttpResponse]
  }

在这里我捕获响应并在失败时重试(递归):

def getOnlineOffers(modelId: Int, count: Int = 0): Future[Any] = {

    val result = Promise[Any]()

    AkkaSys.doHttpRequest(Market.modelOffersUrl(modelId)).map(response => {
      val responseCode = response.status.intValue
      if (List(400, 404).contains(responseCode)) {
        result.success("Bad request")
      } else if (responseCode == 200) {
        Try {
          Json.parse(response.entity.asString).asOpt[JsObject]
        } match {
          case Success(Some(obj)) =>
            Try {
              (obj \\ "onlineOffers").head.as[Int]
            } match {
              case Success(offers) => result.success(offers)
              case _ => result.success("Can't find property")
            }

          case _ => result.success("Wrong body")
        }
      } else {
        result.success("Unexpected error")
      }
    }).recover { case err =>
      if (count > 5) {
        result.success("Too many tries")
      } else {
        println(err.toString)
        Thread.sleep(200)
        getOnlineOffers(modelId, count + 1).map(r => result.success(r))
      }
    }

    result.future

  }

如何正确执行此操作?可能我需要以某种方式配置akka调度程序吗?

4

1 回答 1

1

您可以使用http://spray.io/documentation/1.2.2/spray-client/并为您编写个人管道

val pipeline: Future[SendReceive] =
      for (
        Http.HostConnectorInfo(connector, _) <-
          IO(Http) ? Http.HostConnectorSetup("www.spray.io", port = 80)
      ) yield sendReceive(connector)

    val request = Get("/segment1/segment2/...")
    val responseFuture: Future[HttpResponse] = pipeline.flatMap(_(request))

获取 HttpResponse

import scala.concurrent.Await
import scala.concurrent.duration._
val response: HttpResponse = Aweit(responseFuture, ...)

转换

import spray.json._
response.entity.asString.parseJson.convertTo[T]

去检查

Try(response.entity.asString.parseJson).isSuccess

括号太多。在scala中你可以写得更短

于 2016-03-10T16:01:18.877 回答