以下代码将逐行读取文件,为每一行创建一个任务,然后将其排队到执行程序。如果执行程序的队列已满,则停止从文件中读取,直到再次有空间为止。
我查看了 SO 中的一些建议,但它们要么要求将文件的全部内容读入内存,要么要求不理想的调度(例如,读取 100 行,并行处理它们,只有在完成之后,才读取接下来的 100 行) . 我也不想为此使用像 Akka 这样的库。
没有这些缺点的 Scala 实现这一目标的方法是什么?
val exec = executorWithBoundedQueue()
val lines = Source.fromFile(sourceFile, cs).getLines()
lines.map {
l => exec.submit(new Callable[String] {
override def call(): String = doStuff(l)
})
}.foreach {
s => consume(s.get())
}
exec.shutdown()
说明性定义executorWithBoundedQueue
def executorWithBoundedQueue(): ExecutorService = {
val boundedQueue = new LinkedBlockingQueue[Runnable](1000)
new ThreadPoolExecutor(boundedQueue)
}