4

问题

由于 Fork-Join 似乎是当前的炒作并在许多答案中被推荐,我想:为什么不研究一下它的实际速度呢?

为了测量这一点,我编写了一个小程序(见下面的代码),它对数字进行一些加法并使用各种参数将其分叉出来,包括线程数、分叉深度和分叉扩展,然后测量执行时间,尤其是实际计算所花费的时间与分叉所花费的时间。

摘要答案

虽然实现得很好,但 ForkJoin 是一种非常低效的并行任务方式,因为每个 fork 的成本非常高。一个简单的问题优化实现可以轻松存档 99% 的线程执行时间(这超过了使用 Fork-Join 测量的所有内容),因此这样的实现总是比 Fork-Join 实现更快。此外,如果每个 fork 的实际任务很小,则 Fork-Join 实现甚至比单线程线性实现要慢得多。

所以 Fork-Join 更多的是一个关于它是否有助于代码架构的问题,因为它与其他实现相比没有任何性能优势。因此,只有在以下情况下才应使用 Fork-Join:

  • 性能并不重要,任务经常需要等待其他任务的结果才能继续。所以基本上如果 Fork-Join 结构大大简化了简单实现的任务。

  • 实际任务大大超过了分叉的成本,因此损失可以忽略不计。在我的测试中,添加 2 个值的循环必须在每个 fork 中循环至少 10000 次才能获得合理的性能。

编辑:请参阅此处了解我所指出的更深入的分析。

测试设置

在我的程序中,我有一个 RecursiveTask 计算给定 N 的斐波那契数列,这将实际计算减少到 3 个作业和 1 个加法。对于任何给定的 CPU,这应该是一项次要任务。

在测试中,我改变了线程数量、每个任务的分叉数量和斐波那契循环的长度。此外,我用 async 参数做了一些测试,但是将这个设置为 false 只显示计算时间略有减少,所以我跳过了。传播参数(每个叉子的叉子)也大部分被跳过,因为结果没有显着差异。

一般来说,计算时间非常稳定,实际花费在任务上的时间百分比变化通常小于 1%,因此每个测试集在其他空闲系统上运行了大约 5 次(如果数字不稳定,则运行更多次)具有 4 个核心(+4 个超核心),然后选择了中值执行时间。

已通过各种测试变量验证了正确的执行,特别是已验证实际使用的线程数与最初给定的并行度参数没有差异。

详细的测试结果

在哪里:

  • Time total是从主线程的角度来看整个计算所花费的总时间。
  • Time task是实际计算所有叉子组合中的斐波那契数列所花费的时间。
  • Time task percentage是线程的相对增益(时间任务/时间总和)。
  • spread->depth是(设置)传播(每个叉子的叉子)和(计算的)分叉深度。
  • threads是实际使用的线程数。
  • task-time/thread是每个线程实际花费在总体上计算斐波那契数列的时间。

传播->深度测试:

Time total: 8766.670 ms, time task: 1717.418 ms ( 19.59%), spread->depth:  2->26, thread#: 1, task-time/thread: 19.59%
Time total: 7872.244 ms, time task: 1421.478 ms ( 18.06%), spread->depth: 10-> 8, thread#: 1, task-time/thread: 18.06%
Time total: 7336.052 ms, time task: 1280.036 ms ( 17.45%), spread->depth: 100-> 4, thread#: 1, task-time/thread: 17.45%

结论:分叉的数量只有很小的影响(更少的分叉=更好),实现似乎相当复杂。使用其他设置收集了类似的结果,所以我在这里跳过这些。

Fib(0)(几乎所有时间都花在分叉上)

Time total: 7866.777 ms, time task: 1421.488 ms ( 18.07%), spread->depth: 10-> 8, thread#: 1, task-time/thread: 18.07%
Time total: 7085.142 ms, time task: 1349.207 ms ( 19.04%), spread->depth: 10-> 8, thread#: 2, task-time/thread:  9.52%
Time total: 6580.609 ms, time task: 1476.467 ms ( 22.44%), spread->depth: 10-> 8, thread#: 4, task-time/thread:  5.61%

结论:对于一个非常小的任务,大部分时间都花在了 fork 上,这使得单线程实现比任何 Fork-Join 设置快约 5 倍。即使有多个线程,使用 Fork-Join 也不可能获得任何性能提升。

纤维(100)

Time total: 12487.634 ms, time task: 5707.720 ms ( 45.71%), spread->depth: 10-> 8, thread#: 1, task-time/thread: 45.71%
Time total:  8386.855 ms, time task: 5768.881 ms ( 68.78%), spread->depth: 10-> 8, thread#: 2, task-time/thread: 34.39%
Time total:  7078.769 ms, time task: 6086.997 ms ( 85.99%), spread->depth: 10-> 8, thread#: 4, task-time/thread: 21.50%

结论:似乎已经接近单线程执行的盈亏平衡点,而多线程开始产生影响。单线程实现仍然比任何 Fork-Join 设置都快。

纤维(1000)

Time total:  5941.344 ms, time task:  5228.258 ms ( 88.00%), spread->depth: 10-> 7, thread#: 1, task-time/thread: 88.00%
Time total:  3160.818 ms, time task:  5244.241 ms (165.91%), spread->depth: 10-> 7, thread#: 2, task-time/thread: 82.96%
Time total: 16301.697 ms, time task: 53351.694 ms (327.28%), spread->depth: 10-> 8, thread#: 4, task-time/thread: 81.82%

结论:多线程执行的时间开始趋于稳定,几乎线性增益,而每个线程仍有约 20% 的计算时间用于分叉。虽然此时分叉可以通过线程提高性能,但幼稚的实现仍然会明显更快。

纤维(10000)

Time total:  5204.786 ms, time task:  5119.133 ms ( 98.35%), spread->depth: 10-> 6, thread#: 1, task-time/thread: 98.35%
Time total: 26033.889 ms, time task: 51084.118 ms (196.22%), spread->depth: 10-> 7, thread#: 2, task-time/thread: 98.11%
Time total: 13183.573 ms, time task: 51637.471 ms (391.68%), spread->depth: 10-> 7, thread#: 4, task-time/thread: 97.92%

结论:在这个数字上,计算超过了分叉的成本。虽然幼稚的实现仍然会稍微快一些,但如果任务以另一种方式实现起来要困难得多,那么分叉造成的损失可以忽略不计。

代码

public class Test {

  static final int NUM_THREADS = 4;
  static final int SPREAD = 10;
  static final int LOOPS = 4000000;
  static final int CALCULATION_N = 10000;
  static final boolean DO_ASYNC = true;
  //---
  static final long MAX_DEPTH = Math.round(Math.log(LOOPS) / Math.log(SPREAD)); // try to have the execution take about the same time

  private static class Task extends RecursiveTask<Integer> {

    final static AtomicLong timeExecute = new AtomicLong(0);
    final static AtomicLong totalLoops = new AtomicLong(0);
    final long depth;

    public Task(final long depth) {
      this.depth = depth;
    }

    @Override
    protected Integer compute() {
      if (depth < MAX_DEPTH) {
        final Task[] subTasks = new Task[SPREAD];
        for (int i = 0; i < subTasks.length; ++i) {
          subTasks[i] = new Task(depth + 1);
        }
        try {
          invokeAll(subTasks);
          final long startTime = System.nanoTime();
          int result = 0;
          for (final Task task : subTasks) {
            if (task.isCompletedNormally()) {
              result += task.get();
            }
          }
          timeExecute.addAndGet(System.nanoTime() - startTime);
          return result;
        } catch (Exception e) {
          this.completeExceptionally(e);
          return null;
        }
      } else {
        totalLoops.incrementAndGet();
        final long startTime = System.nanoTime();
        int a = 0, b = 1, h;
        for (int n = 0; n < CALCULATION_N; ++n) {
          h = b;
          b = a + b;
          a = h;
        }
        timeExecute.addAndGet(System.nanoTime() - startTime);
        return b;
      }
    }
  }

  public static void main(String[] args) {
    final AtomicInteger threadCount = new AtomicInteger(0);
    final ForkJoinPool pool = new ForkJoinPool(NUM_THREADS, new ForkJoinPool.ForkJoinWorkerThreadFactory() {
      @Override
      public ForkJoinWorkerThread newThread(ForkJoinPool pool) {
        threadCount.getAndIncrement();
        final ForkJoinWorkerThread result = ForkJoinPool.defaultForkJoinWorkerThreadFactory.newThread(pool);
        result.setPriority(Thread.MIN_PRIORITY);
        return result;
      }
    }, null, DO_ASYNC);
    final long startTime = System.nanoTime();
    final Integer result = pool.invoke(new Task(0));
    final double duration = ((double) (System.nanoTime() - startTime)) / 1000000.0;
    final double executionDuration = ((double) Task.timeExecute.get()) / 1000000.0;
    final double executionPercent = executionDuration / duration * 100.0;
    final double executionPercentPerThread = executionPercent / ((double) NUM_THREADS);

    System.out.println("Completed: " + result + " in " + Task.totalLoops.get() + " loops.");
    System.out.println(String.format("Time total: %8.3f ms, time task: %8.3f ms (%6.2f%%), spread->depth: %2d->%2d, thread#: %1d, task-time/thread: %5.2f%%", duration, executionDuration, executionPercent, SPREAD, MAX_DEPTH, threadCount.get(), executionPercentPerThread));
  }
}

随时指出任何错误或提出改进建议。对于一些奖励积分,我将接受最有价值的答案。

4

3 回答 3

5

建议:

  • 打印分叉的数量 + 已完成工作的成本估算(即,BigInteger如果您切换到它们,则添加的数量或 s 的长度相加)。这个比例将显示你的分叉策略的有效性,并让你了解什么是有意义的最小工作规模。
  • 检查你的算法——斐波那契指数增长,你的任务返回整数,所以你应该很快就会溢出。

因此,目标是选择一个阈值来表示分叉或不分叉:

使用 fork/join 并行性实现算法时要考虑的主要事项之一是选择阈值,该阈值确定任务是否将执行顺序计算而不是分叉并行子任务。

如果阈值太大,则程序可能无法创建足够的任务来充分利用可用的处理器/内核。

如果阈值太小,则任务创建和管理的开销可能会变得很大。

通常,需要进行一些实验来找到合适的阈值。资源

这也可能有用:如何确定 fork-join 任务的正确分工阈值

于 2013-11-29T15:27:57.837 回答
3

我还没有尝试过您的测试,但是对于任何分/治或排队方法,您必须权衡拆分工作、队列和作业处理以及汇总作业结果的成本。因此,与单线程版本相比,总 CPU 周期永远不会有 100% 的效率。我自己有另一个基于斐波那契的测试,我尝试设置递归限制,以便在同一个线程中递归计算 fib(limit) 而不会为下一个递归级别生成新作业。所以这个递归级别所花费的时间就是每个 ForkJoinTask 所花费的时间。我在实际基准测试之前测量了那个时间,以找到一个任务应该多长时间才能在最小开销和最大核心利用率之间取得最佳平衡。对于我测试的硬件,单路 x86 大约 10µs 到 4 路机器 1ms。

于 2013-11-29T17:23:50.227 回答
1

你的“测量”有很大的观察者效应......

您可能想用 LongAdder 替换您的 AtomicLongs 以减少测量的影响......考虑进一步减少它们......

使用像 JMH 这样的框架来缓解基准测试的陷阱......

你的测量值不是任何人都可以用来做出任何非天真的结论的东西......

FJP 是一个非常好的线程池实现,它是 JDK 中利用 cpu 内核的最佳选择。

在我的基准测试(使用 JMH)中,将 FJP 与“旧版”JDK 执行程序进行了比较:

https://github.com/zolyfarkas/spf4j/blob/master/spf4j-benchmarks/src/test/java/org/spf4j/concurrent/ThreadPoolBenchmarkFjp.java

https://github.com/zolyfarkas/spf4j/blob/master/spf4j-benchmarks/src/test/java/org/spf4j/concurrent/ThreadPoolBenchmarkStdJdk.java

在 jdk 1.7 FJP 上运行大约快 2 倍:

Benchmark                                   Mode  Cnt     Score     Error  Units
ThreadPoolBenchmarkFjp.fjpBenchmark        thrpt   10  6873.926 ± 334.733  ops/s
ThreadPoolBenchmarkStdJdk.stdJdkBenchmark  thrpt   10  3210.170 ± 170.883  ops/s

Jdk 1.8 FJP 速度提高了 3 倍:

Benchmark                                   Mode  Cnt     Score      Error  Units
ThreadPoolBenchmarkFjp.fjpBenchmark        thrpt   10  9679.502 ± 1160.887  ops/s
ThreadPoolBenchmarkStdJdk.stdJdkBenchmark  thrpt   10  3466.997 ±   81.594  ops/s
于 2015-09-12T22:39:39.837 回答