问题
由于 Fork-Join 似乎是当前的炒作并在许多答案中被推荐,我想:为什么不研究一下它的实际速度呢?
为了测量这一点,我编写了一个小程序(见下面的代码),它对数字进行一些加法并使用各种参数将其分叉出来,包括线程数、分叉深度和分叉扩展,然后测量执行时间,尤其是实际计算所花费的时间与分叉所花费的时间。
摘要答案
虽然实现得很好,但 ForkJoin 是一种非常低效的并行任务方式,因为每个 fork 的成本非常高。一个简单的问题优化实现可以轻松存档 99% 的线程执行时间(这超过了使用 Fork-Join 测量的所有内容),因此这样的实现总是比 Fork-Join 实现更快。此外,如果每个 fork 的实际任务很小,则 Fork-Join 实现甚至比单线程线性实现要慢得多。
所以 Fork-Join 更多的是一个关于它是否有助于代码架构的问题,因为它与其他实现相比没有任何性能优势。因此,只有在以下情况下才应使用 Fork-Join:
性能并不重要,任务经常需要等待其他任务的结果才能继续。所以基本上如果 Fork-Join 结构大大简化了简单实现的任务。
实际任务大大超过了分叉的成本,因此损失可以忽略不计。在我的测试中,添加 2 个值的循环必须在每个 fork 中循环至少 10000 次才能获得合理的性能。
编辑:请参阅此处了解我所指出的更深入的分析。
测试设置
在我的程序中,我有一个 RecursiveTask 计算给定 N 的斐波那契数列,这将实际计算减少到 3 个作业和 1 个加法。对于任何给定的 CPU,这应该是一项次要任务。
在测试中,我改变了线程数量、每个任务的分叉数量和斐波那契循环的长度。此外,我用 async 参数做了一些测试,但是将这个设置为 false 只显示计算时间略有减少,所以我跳过了。传播参数(每个叉子的叉子)也大部分被跳过,因为结果没有显着差异。
一般来说,计算时间非常稳定,实际花费在任务上的时间百分比变化通常小于 1%,因此每个测试集在其他空闲系统上运行了大约 5 次(如果数字不稳定,则运行更多次)具有 4 个核心(+4 个超核心),然后选择了中值执行时间。
已通过各种测试变量验证了正确的执行,特别是已验证实际使用的线程数与最初给定的并行度参数没有差异。
详细的测试结果
在哪里:
Time total
是从主线程的角度来看整个计算所花费的总时间。Time task
是实际计算所有叉子组合中的斐波那契数列所花费的时间。Time task percentage
是线程的相对增益(时间任务/时间总和)。spread->depth
是(设置)传播(每个叉子的叉子)和(计算的)分叉深度。threads
是实际使用的线程数。task-time/thread
是每个线程实际花费在总体上计算斐波那契数列的时间。
传播->深度测试:
Time total: 8766.670 ms, time task: 1717.418 ms ( 19.59%), spread->depth: 2->26, thread#: 1, task-time/thread: 19.59%
Time total: 7872.244 ms, time task: 1421.478 ms ( 18.06%), spread->depth: 10-> 8, thread#: 1, task-time/thread: 18.06%
Time total: 7336.052 ms, time task: 1280.036 ms ( 17.45%), spread->depth: 100-> 4, thread#: 1, task-time/thread: 17.45%
结论:分叉的数量只有很小的影响(更少的分叉=更好),实现似乎相当复杂。使用其他设置收集了类似的结果,所以我在这里跳过这些。
Fib(0)(几乎所有时间都花在分叉上)
Time total: 7866.777 ms, time task: 1421.488 ms ( 18.07%), spread->depth: 10-> 8, thread#: 1, task-time/thread: 18.07%
Time total: 7085.142 ms, time task: 1349.207 ms ( 19.04%), spread->depth: 10-> 8, thread#: 2, task-time/thread: 9.52%
Time total: 6580.609 ms, time task: 1476.467 ms ( 22.44%), spread->depth: 10-> 8, thread#: 4, task-time/thread: 5.61%
结论:对于一个非常小的任务,大部分时间都花在了 fork 上,这使得单线程实现比任何 Fork-Join 设置快约 5 倍。即使有多个线程,使用 Fork-Join 也不可能获得任何性能提升。
纤维(100)
Time total: 12487.634 ms, time task: 5707.720 ms ( 45.71%), spread->depth: 10-> 8, thread#: 1, task-time/thread: 45.71%
Time total: 8386.855 ms, time task: 5768.881 ms ( 68.78%), spread->depth: 10-> 8, thread#: 2, task-time/thread: 34.39%
Time total: 7078.769 ms, time task: 6086.997 ms ( 85.99%), spread->depth: 10-> 8, thread#: 4, task-time/thread: 21.50%
结论:似乎已经接近单线程执行的盈亏平衡点,而多线程开始产生影响。单线程实现仍然比任何 Fork-Join 设置都快。
纤维(1000)
Time total: 5941.344 ms, time task: 5228.258 ms ( 88.00%), spread->depth: 10-> 7, thread#: 1, task-time/thread: 88.00%
Time total: 3160.818 ms, time task: 5244.241 ms (165.91%), spread->depth: 10-> 7, thread#: 2, task-time/thread: 82.96%
Time total: 16301.697 ms, time task: 53351.694 ms (327.28%), spread->depth: 10-> 8, thread#: 4, task-time/thread: 81.82%
结论:多线程执行的时间开始趋于稳定,几乎线性增益,而每个线程仍有约 20% 的计算时间用于分叉。虽然此时分叉可以通过线程提高性能,但幼稚的实现仍然会明显更快。
纤维(10000)
Time total: 5204.786 ms, time task: 5119.133 ms ( 98.35%), spread->depth: 10-> 6, thread#: 1, task-time/thread: 98.35%
Time total: 26033.889 ms, time task: 51084.118 ms (196.22%), spread->depth: 10-> 7, thread#: 2, task-time/thread: 98.11%
Time total: 13183.573 ms, time task: 51637.471 ms (391.68%), spread->depth: 10-> 7, thread#: 4, task-time/thread: 97.92%
结论:在这个数字上,计算超过了分叉的成本。虽然幼稚的实现仍然会稍微快一些,但如果任务以另一种方式实现起来要困难得多,那么分叉造成的损失可以忽略不计。
代码
public class Test {
static final int NUM_THREADS = 4;
static final int SPREAD = 10;
static final int LOOPS = 4000000;
static final int CALCULATION_N = 10000;
static final boolean DO_ASYNC = true;
//---
static final long MAX_DEPTH = Math.round(Math.log(LOOPS) / Math.log(SPREAD)); // try to have the execution take about the same time
private static class Task extends RecursiveTask<Integer> {
final static AtomicLong timeExecute = new AtomicLong(0);
final static AtomicLong totalLoops = new AtomicLong(0);
final long depth;
public Task(final long depth) {
this.depth = depth;
}
@Override
protected Integer compute() {
if (depth < MAX_DEPTH) {
final Task[] subTasks = new Task[SPREAD];
for (int i = 0; i < subTasks.length; ++i) {
subTasks[i] = new Task(depth + 1);
}
try {
invokeAll(subTasks);
final long startTime = System.nanoTime();
int result = 0;
for (final Task task : subTasks) {
if (task.isCompletedNormally()) {
result += task.get();
}
}
timeExecute.addAndGet(System.nanoTime() - startTime);
return result;
} catch (Exception e) {
this.completeExceptionally(e);
return null;
}
} else {
totalLoops.incrementAndGet();
final long startTime = System.nanoTime();
int a = 0, b = 1, h;
for (int n = 0; n < CALCULATION_N; ++n) {
h = b;
b = a + b;
a = h;
}
timeExecute.addAndGet(System.nanoTime() - startTime);
return b;
}
}
}
public static void main(String[] args) {
final AtomicInteger threadCount = new AtomicInteger(0);
final ForkJoinPool pool = new ForkJoinPool(NUM_THREADS, new ForkJoinPool.ForkJoinWorkerThreadFactory() {
@Override
public ForkJoinWorkerThread newThread(ForkJoinPool pool) {
threadCount.getAndIncrement();
final ForkJoinWorkerThread result = ForkJoinPool.defaultForkJoinWorkerThreadFactory.newThread(pool);
result.setPriority(Thread.MIN_PRIORITY);
return result;
}
}, null, DO_ASYNC);
final long startTime = System.nanoTime();
final Integer result = pool.invoke(new Task(0));
final double duration = ((double) (System.nanoTime() - startTime)) / 1000000.0;
final double executionDuration = ((double) Task.timeExecute.get()) / 1000000.0;
final double executionPercent = executionDuration / duration * 100.0;
final double executionPercentPerThread = executionPercent / ((double) NUM_THREADS);
System.out.println("Completed: " + result + " in " + Task.totalLoops.get() + " loops.");
System.out.println(String.format("Time total: %8.3f ms, time task: %8.3f ms (%6.2f%%), spread->depth: %2d->%2d, thread#: %1d, task-time/thread: %5.2f%%", duration, executionDuration, executionPercent, SPREAD, MAX_DEPTH, threadCount.get(), executionPercentPerThread));
}
}
随时指出任何错误或提出改进建议。对于一些奖励积分,我将接受最有价值的答案。