6

将 spray 1.3.2 与 akka 2.3.6 一起使用。(akka 仅用于喷雾)。
我需要读取大文件并为每一行发出一个 http 请求。
我用迭代器逐行读取文件,并为每个项目提出请求。它在某些行上成功运行,但在某些时候它开始失败:
akka.pattern.AskTimeoutException: Ask timed out on [Actor[akka://default/user/IO-HTTP#-35162984]] after [60000 ms]
我首先认为我超载了服务,所以我将“spray.can.host-connector.max-connections”设置为 1。它运行得慢得多,但我得到了同样的错误。

这里的代码:

import spray.http.MediaTypes._
val EdnType = register(
MediaType.custom(
  mainType = "application",
  subType = "edn",
  compressible = true,
  binary = false,
  fileExtensions = Seq("edn")))

val pipeline = (
  addHeader("Accept", "application/json")
  ~> sendReceive
  ~> unmarshal[PipelineResponse])

def postData(data: String) = {
  val request = Post(pipelineUrl).withEntity(HttpEntity.apply(EdnType, data))
  val responseFuture: Future[PipelineResponse] = pipeline(request)
  responseFuture
}

dataLines.map { d =>
  val f = postData(d)
  f.onFailure { case e => println("Error - "+e)} // This is where the errors are display
  f.map { p => someMoreLogic(d, p) }
}

aggrigateResults(dataLines)

我这样做是因为我不需要整个数据,只需要一些聚合。

我该如何解决这个问题并让它完全异步?

4

1 回答 1

7

Akka ask timeout 是通过 firstCompletedOf 实现的,所以在 ask 初始化的时候就开始计时。

您似乎在做的是为每一行(在地图期间)产生一个 Future - 所以您的所有调用几乎同时执行。当期货被初始化时,超时开始计数,但没有任何执行线程可供所有派生的演员完成他们的工作。因此请求超时。

我建议使用更灵活的方法,而不是“一次性”处理 - 有点类似于使用 iteratees 或 akka-streams:Work Pulling Pattern。( Github )

您提供您已经拥有的迭代器作为Epic. 引入一个Workeractor,它将执行调用和一些逻辑。如果您N workers随后生成,则最多将N同时处理几行(并且处理管道可能涉及多个步骤)。这样,您可以确保不会使执行程序超载,并且不应该发生超时。

于 2014-11-16T19:19:50.177 回答