9

我有一个用于从 Web 服务中获取逗号分隔的 Id 列表的理解。
然后我使用 Id List 进行新的调用,我的问题是 Id List 的长度可能在 10 000 左右,并且每个调用都是一个中等大小的 XML 文档。
当我同时异步请求所有 10 000 个请求时,Web 服务端点(或者它可能是 Play Framework)不太喜欢它,因为我只得到大约 500 个正确响应。

一些伪代码来突出显示意图。

for {
  respA <- WS.url(url1).get
  id <- respA.body.split(",")
  respB <- WS.url(url2 + id).get
} yield ...

如何将并发请求限制在更可行的范围内?

4

4 回答 4

10

这是一个示例应用程序,它将 10,000 个请求(通过 Play 的 WS 库)分成 1,000 个组 - 全部以异步和非阻塞方式:

package controllers

import play.api.libs.concurrent.Promise
import scala.concurrent.duration._
import play.api.libs.ws.WS
import scala.concurrent.{Await, Future}
import scala.concurrent.ExecutionContext.Implicits.global
import play.api.mvc.{Action, Controller}
import play.api.libs.ws.Response
import play.api.Logger

object Application extends Controller {

  var numRequests = 0

  def index = Action {
    Async {
      val batches: Iterator[Seq[WS.WSRequestHolder]] = requests.grouped(1000)

      val allBatchesFutureResponses = batches.foldLeft(Future.successful(Seq.empty[Response])) { (allFutureResponses, batch) =>
        allFutureResponses.flatMap { responses =>
          val batchFutures = Future.sequence(batch.map(_.get))
          batchFutures.map { batchResponses =>
            responses ++ batchResponses
          }
        }
      }

      allBatchesFutureResponses.map { responses =>
        Logger.info(responses.size.toString)
        Ok
      }
    }
  }

  def requests = (1 to 10000).map { i =>
    WS.url("http://localhost:9000/pause")
  }

  def pause = Action {
    Async {
      Logger.info(numRequests.toString)
      numRequests = numRequests + 1
      Promise.timeout(Ok, 1 seconds)
    }
  }

}
于 2013-08-02T23:47:27.097 回答
7

你需要做一些节流。

阿卡

使用一些 Akka Actor 来发出请求怎么样?查看这些使用 akka 进行节流的方法:

只是期货

如果您只想使用Futures 而没有 Akka Actors,则可以使用组合flatMap(将 HTTP 请求链接起来以一个接一个地发生)并Future.sequence获得所需的并行度。

于 2013-07-07T12:33:02.900 回答
0

您可以考虑将调用批处理到第二个 Web 服务中,并且仅在前一个批处理完成后才继续执行后续批处理。该方法可能如下所示:

val fut = for {
  ids <- WS.url(url1).get.map(res => res.body.split("").grouped(batchSize).toList)   
  responses <- processBatches(ids)
} yield responses

fut onComplete{
  case Success(responses) => //handle responses
  case Failure(ex) => //handle fail
}

def processBatches(batches:List[Array[String]]) = {
  val prom = Promise[List[Response]]()
  var trys = List[List[Response]]()

  def doProcessBatch(remainingBatches:List[Array[String]]) {      
    val batch = remainingBatches.head
    val futs = batch.map(id => WS.url(url2 + id).get).toList
    Future.sequence(futs) onComplete{ tr =>
      val list = tr.getOrElse(List()) //add better error handling here
      trys = list :: trys
      if (remainingBatches.size > 1) doProcessBatch(remainingBatches.tail)
      else prom.success(trys.flatten)
    }      
  }
  doProcessBatch(batches)
  prom.future
}

野兔的想法是点击第一个服务以获取 ids 列表,然后将其分解为由您选择的某个批次大小确定的批次。然后,处理这些批次,将 batchSize 数量的并发调用发送到第二个 ws 调用,等到所有都完成后再移动到下一个批次。完成后,您将拥有Future一个List[Response]代表对第二个服务进行的所有调用的一个。这不是生产就绪代码,因为它需要更好地处理故障(在这种情况下我只是返回一个空列表)。它也可能需要在此行上链接对recoverafter的调用:get

val futs = batch.map(id => WS.url(url2 + id).get).toList

以防止批次中的一次失败导致您丢失该批次的其余结果,但我将把这些东西留给您。我只是想向您展示一个高级概念,用于将调用批处理到第二个服务中,以不被调用淹没。

于 2013-07-07T14:01:18.770 回答
-2

使用线程池。以下 URL 描述了整个机制:http: //msdn.microsoft.com/en-us/library/ms973903.aspx

于 2013-07-07T10:55:14.917 回答