6

在学习 Paul Chiusano 和 Runar Bjanarson 所著的“Scala 中的函数式编程”一书(第 7 章 - 纯函数式并行)时,我遇到了以下场景。

    package fpinscala.parallelism

    import java.util.concurrent._
    import language.implicitConversions


    object Par {
      type Par[A] = ExecutorService => Future[A]

      def run[A](s: ExecutorService)(a: Par[A]): Future[A] = a(s)

      def unit[A](a: A): Par[A] = (es: ExecutorService) => UnitFuture(a) // `unit` is represented as a function that returns a `UnitFuture`, which is a simple implementation of `Future` that just wraps a constant value. It doesn't use the `ExecutorService` at all. It's always done and can't be cancelled. Its `get` method simply returns the value that we gave it.

      private case class UnitFuture[A](get: A) extends Future[A] {
        def isDone = true
        def get(timeout: Long, units: TimeUnit) = get
        def isCancelled = false
        def cancel(evenIfRunning: Boolean): Boolean = false
      }

      def map2[A,B,C](a: Par[A], b: Par[B])(f: (A,B) => C): Par[C] = // `map2` doesn't evaluate the call to `f` in a separate logical thread, in accord with our design choice of having `fork` be the sole function in the API for controlling parallelism. We can always do `fork(map2(a,b)(f))` if we want the evaluation of `f` to occur in a separate thread.
        (es: ExecutorService) => {
          val af = a(es)
          val bf = b(es)
          UnitFuture(f(af.get, bf.get)) // This implementation of `map2` does _not_ respect timeouts. It simply passes the `ExecutorService` on to both `Par` values, waits for the results of the Futures `af` and `bf`, applies `f` to them, and wraps them in a `UnitFuture`. In order to respect timeouts, we'd need a new `Future` implementation that records the amount of time spent evaluating `af`, then subtracts that time from the available time allocated for evaluating `bf`.
        }

      def fork[A](a: => Par[A]): Par[A] = // This is the simplest and most natural implementation of `fork`, but there are some problems with it--for one, the outer `Callable` will block waiting for the "inner" task to complete. Since this blocking occupies a thread in our thread pool, or whatever resource backs the `ExecutorService`, this implies that we're losing out on some potential parallelism. Essentially, we're using two threads when one should suffice. This is a symptom of a more serious problem with the implementation, and we will discuss this later in the chapter.
        es => es.submit(new Callable[A] {
          def call = a(es).get
        })

      def lazyUnit[A](a: => A): Par[A] = fork(unit(a))

 def equal[A](e: ExecutorService)(p: Par[A], p2: Par[A]): Boolean =
    p(e).get == p2(e).get

}

你可以在这里找到 Github 上的原始代码。有关 java.util.concurrent 文档,请参见此处

我关心fork. 特别是,据称fork当 ThreadPool 太小时会导致死锁。

我考虑以下示例:

val a = Par.lazyUnit(42 + 1)
val es: ExecutorService = Executors.newFixedThreadPool(2)
println(Par.fork(a)(es).get)  

我不希望这个例子最终陷入死锁,因为有两个线程。然而,当我在 Scala REPL 中运行它时,它会在我的计算机上运行。为什么会这样?

初始化时的输出ExecutorService是es: java.util.concurrent.ExecutorService =

java.util.concurrent.ThreadPoolE
xecutor@73a86d72[Running, pool size = 0, active threads = 0, queued tasks =
 0, completed tasks = 0]

这里正确吗pool size = 0?换句话说,这是不理解java.util.concurrent._的问题还是不理解 Scala 部分的问题?

4

1 回答 1

2

好的,经过长时间的调查,我相信我有答案了。完整的故事很长,但我会尝试通过简化和避免许多细节来缩短它。

注意:潜在的 Scala 可以编译到各种不同的目标平台,但是这个特殊问题发生在作为目标的 Java/JVM 上,所以这就是这个答案的意义所在。

您看到的死锁与线程池的大小无关。实际上fork,挂起的是外部调用。它与 REPL 实现细节和多线程的组合有关,但需要学习一些内容才能理解它是如何发生的:

  • Scala REPL 是如何工作的
  • Scala 如何将objects 编译成 Java/JVM
  • Scala 如何在 Java/JVM 上模拟别名参数
  • Java/JVM 如何运行类的静态初始化器

一个较短的版本(另请参见最后的摘要)是此代码挂在 REPL 下,因为当它由 REPL 执行时,它在逻辑上类似于以下代码:

object DeadLock {

  import scala.concurrent._
  import scala.concurrent.duration.Duration
  import scala.concurrent.ExecutionContext.Implicits.global

  val foo: Int = Await.result(Future(calc()), Duration.Inf)

  def printFoo(): Unit = {
    println(s"Foo = $foo")
  }

  private def calc(): Int = {
    println("Before calc")
    42
  }
}


def test(): Unit = {
  println("Before printFoo")
  DeadLock.printFoo()
  println("After printFoo")
} 

或在 Java 世界中非常相似:

class Deadlock {
    static CompletableFuture<Integer> cf;
    static int foo;

    public static void printFoo() {
        System.out.println("Print foo " + foo);
    }

    static {
        cf = new CompletableFuture<Integer>();
        new Thread(new Runnable() {
            @Override
            public void run() {
                calcF();
            }
        }).start();
        try {
            foo = cf.get();
            System.out.println("Future result = " + cf.get());
        } catch (InterruptedException e) {
            e.printStackTrace();f
        } catch (ExecutionException e) {
            e.printStackTrace();
        }
    }


    private static void calcF() {
        cf.complete(42);
    }
}

public static void main(String[] args) {
    System.out.println("Before foo");
    Deadlock.printFoo();
    System.out.println("After foo");
}

如果您清楚为什么这段代码会死锁,那么您已经了解了大部分内容,并且可能可以自己推断出其余的内容。您可能只需浏览最后的“摘要”部分。

Java静态初始化程序如何死锁?

让我们从这个故事的结尾开始:为什么 Java 代码会挂起?发生这种情况是因为 Java/JVM 对静态初始化程序的两个保证(有关更多详细信息,请参阅第12.4.2 节。JLS 的详细初始化过程):

  • 静态初始化程序将在类的任何其他“外部”使用之前运行

  • 静态初始化程序将只运行一次,并通过全局锁定完成

用于静态初始化程序的锁是隐式的,由 JVM 管理,但它就在那里。这意味着代码在逻辑上类似于这样:

class Deadlock {

    static boolean staticInitFinished = false;
    // unique value for each thread!
    static ThreadLocal<Boolean> currentThreadRunsStaticInit = ThreadLocal.withInitial(() -> Boolean.FALSE);


    static CompletableFuture<Integer> cf;
    static int foo;

    static void enforceStaticInit() {
        synchronized (Deadlock.class) {
            // is init finished?
            if (staticInitFinished)
                return;
            // are we the thread already running the init?
            if(currentThreadRunsStaticInit.get())
                return;
            currentThreadRunsStaticInit.set(true);

            cf = new CompletableFuture<Integer>();
            new Thread(new Runnable() {
                @Override
                public void run() {
                    calcF();
                }
            }).start();
            try {
                foo = cf.get();
                System.out.println("Future result = " + cf.get());
            } catch (InterruptedException e) {
                e.printStackTrace();
            } catch (ExecutionException e) {
                e.printStackTrace();
            }
            currentThreadRunsStaticInit.set(false);
            staticInitFinished = true;
        }
    }

    private static void calcF() {
        enforceStaticInit();
        cf.complete(42);
    }

    public static void printFoo() {
        enforceStaticInit();
        System.out.println("Print foo " + foo);
    }
}

现在很清楚为什么这段代码会死锁:我们的静态初始化程序启动一个新线程并阻塞等待它的结果。但是那个新线程试图访问同一个类(calcF方法)并且作为另一个线程它必须等待已经运行的静态初始化程序完成。请注意,如果该calcF方法在另一个类中,一切都会正常工作。

Scala REPL 的工作原理

现在让我们回到关于 Scala REPL 如何工作的故事的开头。这个答案是对实际交易的极大简化,但它抓住了这种情况细节的重要意义。幸运的是,对于 REPL 实现者来说,Scala 编译器是用 Scala 编写的。这意味着 REPL 不必以某种方式解释代码,它可以通过标准编译器运行它,然后通过 Java Reflection API 运行编译后的代码。这仍然需要对代码进行一些修饰,以使编译器满意并返回结果。

当你输入类似

val a = Par.lazyUnit(42 + 1)

在 REPL 中,代码被分析并转换为如下内容:

package line3

object read {
    val a = Par.lazyUnit(42 + 1)
    val res3 = a
}

object eval {
    def print() = {
        println("a: Par.Par[Int] = " + read.res3)
    }
}

然后line3.eval.print()通过反射调用。

类似的故事发生在:

val es: ExecutorService = Executors.newFixedThreadPool(2)

最后当你这样做时

Par.fork(a)(es).get

事情变得更有趣了,因为您依赖于使用imports 巧妙实现的前面几行:

package line5

object read {
    import line2.read.Par
    import line3.read.a
    import line4.read.es

    val res5 = Par.fork(a)(es).get
}

object eval {
    def print() = {
        println("res5: Int = " + read.res5)
    }
}

这里重要的是,您写入 REPL 的所有内容都被包装成一个全新的object,然后像通常的代码一样编译和运行。

Scala 如何在 Java/JVM 上模拟按名称参数

fork方法的定义使用了一个别名参数

def fork[A](a: => Par[A]): Par[A] =

在这里,它用于a懒惰地评估对整个逻辑至关重要的fork. Java/JVM 没有对惰性求值的标准支持,但它可以被模拟,这就是 Scala 编译器所做的。在内部,签名更改为使用 a Function0

def fork[A](aWrapper: () => Par[A]): Par[A] = 

并且每次访问a都替换为对 的调用aWrapper.apply()。魔术的另一部分发生在具有按名称参数的方法的调用方:那里的参数也应该被包装到 a 中Function0,因此代码变得类似于

object read {
    import line2.read.Par
    import line3.read.a
    import line4.read.es

    val res5 = Par.fork(() => a)(es).get
}

但实际上它有点不同。天真地,只为这个小功能需要另一个类,对于这样一个简单的逻辑来说,这感觉很浪费。在 Scala 2.12 的实践中,使用了 Java 8 LambdaMetafactory的魔力,因此代码真的变成了类似

object read {
    import line2.read.Par
    import line3.read.a
    import line4.read.es

    def aWrapper():Int = a

    val res5 = Par.fork(aWrapper _)(es).get
}

whereaWrapper _表示将方法转换为Funciton0使用LambdaMetafactory. 正如您可能从有关 Java 静态初始化程序死锁的章节中所怀疑的那样,引入def aWrapper是一个至关重要的区别。您已经可以看到,此代码与挂起的答案中的第一个 Scala 片段非常相似。

Scala 如何object在 Java/JVM 上编译

最后一个难题是如何object在 Java/JVM 中编译 Scala。好吧,它实际上被编译为类似于“静态类”的东西,但是由于您可以将object其用作对象参数,因此它必须更复杂一些。实际上,所有初始化逻辑都移到了object类的构造函数中,并且有一个简单的静态初始化器调用它。所以我们read在 Java 中的最后一个对象(忽略imports)看起来像这样:

class read$ {
    static read$ MODULE$

    static {
        new read$()
    }

    private Par[Int] res5;

    private read$() {
        MODULE$ = this;
        res5 = Par.fork(read$::aWrapper)(es).get
    }

    private static int aWrapper(){
        return line3.read$.MODULE$.a;
    }
}

这里再次read$::aWrapper表示Function0使用aWrapper. LambdaMetafactory换句话说,Scala 的初始化object被转换为作为Java 静态初始化程序的一部分运行的代码。

概括

总结一下事情是如何搞砸的:

  • REPL 将您的代码转换object为每一行的新代码并编译它

  • object初始化逻辑被翻译成Java静态初始化逻辑

  • 在简单的情况下调用带有名称参数的方法被转换为包装“返回值”逻辑的方法,并且该方法被添加到相同的classobject

  • Par.fork作为object初始化的一部分(即 Java 静态初始化程序的一部分)执行时,会尝试在不同线程上评估按名称参数(即调用同一类上的方法)并阻塞等待该线程的结果

  • Java 静态初始化程序在全局锁下逻辑执行,因此它会阻塞调用该方法的不同线程。但它本身被阻塞等待该方法调用完成。

于 2019-02-01T01:12:37.440 回答