在学习 Paul Chiusano 和 Runar Bjanarson 所著的“Scala 中的函数式编程”一书(第 7 章 - 纯函数式并行)时,我遇到了以下场景。
package fpinscala.parallelism
import java.util.concurrent._
import language.implicitConversions
object Par {
type Par[A] = ExecutorService => Future[A]
def run[A](s: ExecutorService)(a: Par[A]): Future[A] = a(s)
def unit[A](a: A): Par[A] = (es: ExecutorService) => UnitFuture(a) // `unit` is represented as a function that returns a `UnitFuture`, which is a simple implementation of `Future` that just wraps a constant value. It doesn't use the `ExecutorService` at all. It's always done and can't be cancelled. Its `get` method simply returns the value that we gave it.
private case class UnitFuture[A](get: A) extends Future[A] {
def isDone = true
def get(timeout: Long, units: TimeUnit) = get
def isCancelled = false
def cancel(evenIfRunning: Boolean): Boolean = false
}
def map2[A,B,C](a: Par[A], b: Par[B])(f: (A,B) => C): Par[C] = // `map2` doesn't evaluate the call to `f` in a separate logical thread, in accord with our design choice of having `fork` be the sole function in the API for controlling parallelism. We can always do `fork(map2(a,b)(f))` if we want the evaluation of `f` to occur in a separate thread.
(es: ExecutorService) => {
val af = a(es)
val bf = b(es)
UnitFuture(f(af.get, bf.get)) // This implementation of `map2` does _not_ respect timeouts. It simply passes the `ExecutorService` on to both `Par` values, waits for the results of the Futures `af` and `bf`, applies `f` to them, and wraps them in a `UnitFuture`. In order to respect timeouts, we'd need a new `Future` implementation that records the amount of time spent evaluating `af`, then subtracts that time from the available time allocated for evaluating `bf`.
}
def fork[A](a: => Par[A]): Par[A] = // This is the simplest and most natural implementation of `fork`, but there are some problems with it--for one, the outer `Callable` will block waiting for the "inner" task to complete. Since this blocking occupies a thread in our thread pool, or whatever resource backs the `ExecutorService`, this implies that we're losing out on some potential parallelism. Essentially, we're using two threads when one should suffice. This is a symptom of a more serious problem with the implementation, and we will discuss this later in the chapter.
es => es.submit(new Callable[A] {
def call = a(es).get
})
def lazyUnit[A](a: => A): Par[A] = fork(unit(a))
def equal[A](e: ExecutorService)(p: Par[A], p2: Par[A]): Boolean =
p(e).get == p2(e).get
}
你可以在这里找到 Github 上的原始代码。有关 java.util.concurrent 文档,请参见此处。
我关心fork
. 特别是,据称fork
当 ThreadPool 太小时会导致死锁。
我考虑以下示例:
val a = Par.lazyUnit(42 + 1)
val es: ExecutorService = Executors.newFixedThreadPool(2)
println(Par.fork(a)(es).get)
我不希望这个例子最终陷入死锁,因为有两个线程。然而,当我在 Scala REPL 中运行它时,它会在我的计算机上运行。为什么会这样?
初始化时的输出ExecutorService
是es: java.util.concurrent.ExecutorService =
java.util.concurrent.ThreadPoolE
xecutor@73a86d72[Running, pool size = 0, active threads = 0, queued tasks =
0, completed tasks = 0]
这里正确吗pool size = 0
?换句话说,这是不理解java.util.concurrent._
的问题还是不理解 Scala 部分的问题?