线程池和 Fork/Join 的最终目标是相似的:两者都希望尽可能地利用可用的 CPU 能力来实现最大吞吐量。最大吞吐量意味着应该在很长一段时间内完成尽可能多的任务。需要做什么?(对于以下我们将假设不缺少计算任务:对于 100% 的 CPU 利用率,总是有足够的工作要做。此外,在超线程的情况下,我将“CPU”等效用于内核或虚拟内核)。
- 至少需要有与可用 CPU 一样多的线程运行,因为运行较少的线程将导致核心未使用。
- 运行的线程最多必须与可用 CPU 的数量一样多,因为运行更多线程将为调度程序创建额外的负载,调度程序将 CPU 分配给不同的线程,这会导致一些 CPU 时间用于调度程序而不是我们的计算任务。
因此我们发现,为了获得最大吞吐量,我们需要拥有与 CPU 完全相同数量的线程。在 Oracle 的模糊示例中,您既可以采用固定大小的线程池,其线程数等于可用 CPU 的数量,也可以使用线程池。这不会有什么不同,你是对的!
那么你什么时候会遇到线程池的麻烦呢?那就是如果线程阻塞,因为您的线程正在等待另一个任务完成。假设以下示例:
class AbcAlgorithm implements Runnable {
public void run() {
Future<StepAResult> aFuture = threadPool.submit(new ATask());
StepBResult bResult = stepB();
StepAResult aResult = aFuture.get();
stepC(aResult, bResult);
}
}
我们这里看到的是一个由A、B、C三个步骤组成的算法。A和B可以相互独立执行,但是步骤C需要步骤A和B的结果。这个算法所做的就是将任务A提交给线程池并直接执行任务b。之后,线程将等待任务 A 也完成并继续执行步骤 C。如果 A 和 B 同时完成,那么一切都很好。但是如果 A 比 B 花费更长的时间呢?这可能是因为任务 A 的性质决定了它,但也可能是因为一开始没有任务 A 可用的线程,任务 A 需要等待。(如果只有一个 CPU 可用,因此您的线程池只有一个线程,这甚至会导致死锁,但现在这不是重点)。关键是刚刚执行任务B的线程阻塞整个线程。由于我们拥有与 CPU 相同数量的线程,并且一个线程被阻塞,这意味着一个 CPU 处于空闲状态。
Fork/Join 解决了这个问题:在 fork/join 框架中,您将编写相同的算法,如下所示:
class AbcAlgorithm implements Runnable {
public void run() {
ATask aTask = new ATask());
aTask.fork();
StepBResult bResult = stepB();
StepAResult aResult = aTask.join();
stepC(aResult, bResult);
}
}
看起来一样,不是吗?不过线索是aTask.join
不会挡。相反,这里是工作窃取发挥作用的地方:线程将四处寻找过去已经分叉的其他任务,并将继续执行这些任务。首先它检查它自己派生的任务是否已经开始处理。所以如果A还没有被另一个线程启动,它会接下来做A,否则它会检查其他线程的队列并窃取他们的工作。一旦另一个线程的其他任务完成,它将检查 A 现在是否完成。如果是上面的算法可以调用stepC
。否则它将寻找另一个任务来窃取。因此,fork/join 池可以实现 100% 的 CPU 利用率,即使面对阻塞操作。
join
但是有一个陷阱:只有调用ForkJoinTask
s才有可能窃取工作。它不能用于外部阻塞操作,例如等待另一个线程或等待 I/O 操作。那么,等待 I/O 完成是一项常见的任务呢?在这种情况下,如果我们可以向 Fork/Join 池添加一个额外的线程,该线程将在阻塞操作完成后再次停止,这将是第二好的事情。如果ForkJoinPool
我们使用ManagedBlocker
s,实际上可以做到这一点。
斐波那契
在RecursiveTask 的 JavaDoc 中有一个使用 Fork/Join 计算斐波那契数的示例。有关经典的递归解决方案,请参见:
public static int fib(int n) {
if (n <= 1) {
return n;
}
return fib(n - 1) + fib(n - 2);
}
正如在 JavaDocs 中所解释的,这是一种计算斐波那契数的漂亮转储方法,因为该算法具有 O(2^n) 复杂度,而更简单的方法是可能的。但是这个算法非常简单易懂,所以我们坚持下去。假设我们想用 Fork/Join 来加速这个过程。一个简单的实现应该是这样的:
class Fibonacci extends RecursiveTask<Long> {
private final long n;
Fibonacci(long n) {
this.n = n;
}
public Long compute() {
if (n <= 1) {
return n;
}
Fibonacci f1 = new Fibonacci(n - 1);
f1.fork();
Fibonacci f2 = new Fibonacci(n - 2);
return f2.compute() + f1.join();
}
}
这个任务被拆分的步骤太短了,因此这将执行得非常糟糕,但是你可以看到框架通常是如何工作的:这两个加法可以独立计算,但是我们需要它们两者来构建最终的结果。所以一半在另一个线程中完成。用线程池做同样的事情而不发生死锁(可能,但几乎没有那么简单)。
只是为了完整性:如果您真的想使用这种递归方法计算斐波那契数,这里有一个优化版本:
class FibonacciBigSubtasks extends RecursiveTask<Long> {
private final long n;
FibonacciBigSubtasks(long n) {
this.n = n;
}
public Long compute() {
return fib(n);
}
private long fib(long n) {
if (n <= 1) {
return 1;
}
if (n > 10 && getSurplusQueuedTaskCount() < 2) {
final FibonacciBigSubtasks f1 = new FibonacciBigSubtasks(n - 1);
final FibonacciBigSubtasks f2 = new FibonacciBigSubtasks(n - 2);
f1.fork();
return f2.compute() + f1.join();
} else {
return fib(n - 1) + fib(n - 2);
}
}
}
这使子任务变得更小,因为它们仅在n > 10 && getSurplusQueuedTaskCount() < 2
为真时才被拆分,这意味着有超过 100 个方法调用要执行(n > 10
),并且没有非常多的人任务已经在等待(getSurplusQueuedTaskCount() < 2
)。
在我的计算机上(4 核(计算超线程时为 8),Intel(R) Core(TM) i7-2720QM CPU @ 2.20GHz)fib(50)
使用经典方法需要 64 秒,而使用 Fork/Join 方法只需 18 秒是一个相当显着的收益,尽管没有理论上可能的那么多。
概括
- 是的,在您的示例中,Fork/Join 与经典线程池相比没有优势。
- 当涉及阻塞时,Fork/Join 可以显着提高性能
- Fork/Join 规避了一些死锁问题