我在 Scala 中使用 Akka 演员从外部服务(HTTP 获取请求)下载资源。来自外部服务的响应是 JSON,我必须使用分页(提供程序非常慢)。我想在 10 个线程中同时下载所有分页结果。我使用这样的 URL 来下载块:http ://service.com/itmes?limit=50&offset=1000
我创建了以下管道:
ScatterActor => RoundRobinPool[10](LoadChunkActor) => Aggreator
ScatterActor 获取要下载的项目总数并将其分成块。我创建了 10 个 LoadChunkActor 来同时处理任务。
override def receive: Receive = {
case LoadMessage(limit) =>
val offsets: IndexedSeq[Int] = 0 until limit by chunkSize
offsets.foreach(offset => context.system.actorSelection(pipe) !
LoadMessage(chunkSize, offset))
}
LoadChunkActor 使用 Spray 发送请求。演员长这样:
val pipeline = sendReceive ~> unmarshal[List[Items]]
override def receive: Receive = {
case LoadMessage(limit, offset) =>
val uri: String = s"http://service.com/items?limit=50&offset=$offset"
val responseFuture = pipeline {Get(uri)}
responseFuture onComplete {
case Success(items) => aggregator ! Loaded(items)
}
}
如您所见,LoadChunkActor 正在从外部服务请求块并添加要在 onComplete 上运行的回调。Actor 现在准备好接收另一条消息并且他正在请求另一个块。Spray 正在使用非阻塞 API 来下载块。结果外部服务被我的请求淹没了,我得到了超时。
如何安排任务列表但我想同时处理最多 10 个?