2

以下代码将逐行读取文件,为每一行创建一个任务,然后将其排队到执行程序。如果执行程序的队列已满,则停止从文件中读取,直到再次有空间为止。

我查看了 SO 中的一些建议,但它们要么要求将文件的全部内容读入内存,要么要求不理想的调度(例如,读取 100 行,并行处理它们,只有在完成之后,才读取接下来的 100 行) . 我也不想为此使用像 Akka 这样的库。

没有这些缺点的 Scala 实现这一目标的方法是什么?

  val exec = executorWithBoundedQueue()
  val lines = Source.fromFile(sourceFile, cs).getLines()
  lines.map {
    l => exec.submit(new Callable[String] {
      override def call(): String = doStuff(l)
    })
  }.foreach {
    s => consume(s.get())
  }
  exec.shutdown()

说明性定义executorWithBoundedQueue

def executorWithBoundedQueue(): ExecutorService = {
    val boundedQueue = new LinkedBlockingQueue[Runnable](1000)
    new ThreadPoolExecutor(boundedQueue)
}
4

0 回答 0