145

使用新的fork/join 框架比在开始时简单地将大任务分成 N 个子任务,将它们发送到缓存线程池(来自Executors)并等待每个任务完成有什么好处?我看不到使用 fork/join 抽象如何简化问题或使解决方案比我们多年来所拥有的更有效。

例如,教程示例中的并行化模糊算法可以这样实现:

public class Blur implements Runnable {
    private int[] mSource;
    private int mStart;
    private int mLength;
    private int[] mDestination;

    private int mBlurWidth = 15; // Processing window size, should be odd.

    public ForkBlur(int[] src, int start, int length, int[] dst) {
        mSource = src;
        mStart = start;
        mLength = length;
        mDestination = dst;
    }

    public void run() {
        computeDirectly();
    }

    protected void computeDirectly() {
        // As in the example, omitted for brevity
    }
}

在开始时拆分并将任务​​发送到线程池:

// source image pixels are in src
// destination image pixels are in dst
// threadPool is a (cached) thread pool

int maxSize = 100000; // analogous to F-J's "sThreshold"
List<Future> futures = new ArrayList<Future>();

// Send stuff to thread pool:
for (int i = 0; i < src.length; i+= maxSize) {
    int size = Math.min(maxSize, src.length - i);
    ForkBlur task = new ForkBlur(src, i, size, dst);
    Future f = threadPool.submit(task);
    futures.add(f);
}

// Wait for all sent tasks to complete:
for (Future future : futures) {
    future.get();
}

// Done!

这些任务进入线程池的队列,当工作线程可用时,它们会从队列中执行。只要拆分足够细化(以避免必须特别等待最后一个任务)并且线程池有足够的(至少 N 个处理器)线程,所有处理器都在全速工作,直到整个计算完成。

我错过了什么吗?使用 fork/join 框架的附加价值是什么?

4

11 回答 11

146

我认为基本的误解是,Fork/Join 示例没有显示工作窃取,而只是某种标准的分而治之。

偷工作会是这样的:工人 B 已经完成了他的工作。他是一个善良的人,所以他环顾四周,看到工人A仍在努力工作。他走过去问道:“嘿,小伙子,我可以帮你一把。” 一个回复。“酷,我有这个1000个单位的任务,到现在我已经完成了345个剩下655个,你能不能把673到1000的工作,我做346到672的工作。” B说:“好吧,我们开始吧,我们可以早点去酒吧。”

你看 - 即使他们开始真正的工作,工人也必须相互沟通。这是示例中缺少的部分。

另一方面,示例仅显示“使用分包商”之类的内容:

工人A:“老兄,我有1000个工作单位,对我来说太多了。我自己做500个,再分包给别人500个。” 这种情况一直持续到大任务被分解成每个 10 个单位的小包。这些将由可用的工人执行。但是,如果一个包是一种毒丸,并且比其他包花费的时间要长得多——运气不好,分裂阶段就结束了。

Fork/Join 和预先拆分任务之间唯一剩下的区别是:预先拆分时,工作队列从一开始就已满。示例:1000 个单位,阈值为 10,因此队列有 100 个条目。这些数据包被分发到线程池成员。

Fork/Join 更复杂,并试图保持队列中的数据包数量更小:

  • 步骤 1:将一个包含 (1...1000) 的数据包放入队列
  • 第 2 步:一名工作人员弹出数据包 (1...1000) 并将其替换为两个数据包:(1...500) 和 (501...1000)。
  • 第 3 步:一名工人弹出数据包 (500...1000) 并推送 (500...750) 和 (751...1000)。
  • 第 n 步:堆栈包含以下数据包:(1..500)、(500...750)、(750...875)...(991..1000)
  • 步骤n+1:数据包(991..1000)被弹出并执行
  • 步骤 n+2:数据包 (981..990) 被弹出并执行
  • 步骤 n+3:数据包 (961..980) 被弹出并拆分为 (961...970) 和 (971..980)。……

您会看到:在 Fork/Join 中,队列更小(示例中为 6),并且“拆分”和“工作”阶段是交错的。

当多个工作人员同时弹出和推送时,交互当然不是那么清晰。

于 2011-10-28T11:49:29.280 回答
28

如果您有 n 个繁忙的线程都以 100% 独立工作,那将比 Fork-Join (FJ) 池中的 n 个线程更好。但它永远不会这样。

可能无法将问题精确地分成 n 个相等的部分。即使你这样做了,线程调度也离公平还有一段距离。你最终会等待最慢的线程。如果您有多个任务,那么它们每个都可以以少于 n 路并行度的方式运行(通常更有效),但在其他任务完成后上升到 n 路。

那么我们为什么不把问题分解成 FJ 大小的部分,然后用一个线程池来解决这个问题。典型的 FJ 用法将问题分解为小块。以随机顺序执行这些操作需要在硬件级别上进行大量协调。间接费用将是一个杀手。在 FJ 中,任务被放入队列中,线程按后进先出顺序(LIFO/堆栈)读取该队列,并且工作窃取(通常在核心工作中)是先入先出(FIFO /“队列”)完成的。结果是长数组处理可以在很大程度上按顺序完成,即使它被分成小块。(在一次大爆炸中将问题分解为大小均匀的小块也可能并非易事。比如说在不平衡的情况下处理某种形式的层次结构。)

结论:FJ 允许在不均匀的情况下更有效地使用硬件线程,如果你有多个线程,这将总是如此。

于 2011-10-28T08:45:59.967 回答
23

线程池和 Fork/Join 的最终目标是相似的:两者都希望尽可能地利用可用的 CPU 能力来实现最大吞吐量。最大吞吐量意味着应该在很长一段时间内完成尽可能多的任务。需要做什么?(对于以下我们将假设不缺少计算任务:对于 100% 的 CPU 利用率,总是有足够的工作要做。此外,在超线程的情况下,我将“CPU”等效用于内核或虚拟内核)。

  1. 至少需要有与可用 CPU 一样多的线程运行,因为运行较少的线程将导致核心未使用。
  2. 运行的线程最多必须与可用 CPU 的数量一样多,因为运行更多线程将为调度程序创建额外的负载,调度程序将 CPU 分配给不同的线程,这会导致一些 CPU 时间用于调度程序而不是我们的计算任务。

因此我们发现,为了获得最大吞吐量,我们需要拥有与 CPU 完全相同数量的线程。在 Oracle 的模糊示例中,您既可以采用固定大小的线程池,其线程数等于可用 CPU 的数量,也可以使用线程池。这不会有什么不同,你是对的!

那么你什么时候会遇到线程池的麻烦呢?那就是如果线程阻塞,因为您的线程正在等待另一个任务完成。假设以下示例:

class AbcAlgorithm implements Runnable {
    public void run() {
        Future<StepAResult> aFuture = threadPool.submit(new ATask());
        StepBResult bResult = stepB();
        StepAResult aResult = aFuture.get();
        stepC(aResult, bResult);
    }
}

我们这里看到的是一个由A、B、C三个步骤组成的算法。A和B可以相互独立执行,但是步骤C需要步骤A和B的结果。这个算法所做的就是将任务A提交给线程池并直接执行任务b。之后,线程将等待任务 A 也完成并继续执行步骤 C。如果 A 和 B 同时完成,那么一切都很好。但是如果 A 比 B 花费更长的时间呢?这可能是因为任务 A 的性质决定了它,但也可能是因为一开始没有任务 A 可用的线程,任务 A 需要等待。(如果只有一个 CPU 可用,因此您的线程池只有一个线程,这甚至会导致死锁,但现在这不是重点)。关键是刚刚执行任务B的线程阻塞整个线程。由于我们拥有与 CPU 相同数量的线程,并且一个线程被阻塞,这意味着一个 CPU 处于空闲状态

Fork/Join 解决了这个问题:在 fork/join 框架中,您将编写相同的算法,如下所示:

class AbcAlgorithm implements Runnable {
    public void run() {
        ATask aTask = new ATask());
        aTask.fork();
        StepBResult bResult = stepB();
        StepAResult aResult = aTask.join();
        stepC(aResult, bResult);
    }
}

看起来一样,不是吗?不过线索是aTask.join 不会挡。相反,这里是工作窃取发挥作用的地方:线程将四处寻找过去已经分叉的其他任务,并将继续执行这些任务。首先它检查它自己派生的任务是否已经开始处理。所以如果A还没有被另一个线程启动,它会接下来做A,否则它会检查其他线程的队列并窃取他们的工作。一旦另一个线程的其他任务完成,它将检查 A 现在是否完成。如果是上面的算法可以调用stepC。否则它将寻找另一个任务来窃取。因此,fork/join 池可以实现 100% 的 CPU 利用率,即使面对阻塞操作

join但是有一个陷阱:只有调用ForkJoinTasks才有可能窃取工作。它不能用于外部阻塞操作,例如等待另一个线程或等待 I/O 操作。那么,等待 I/O 完成是一项常见的任务呢?在这种情况下,如果我们可以向 Fork/Join 池添加一个额外的线程,该线程将在阻塞操作完成后再次停止,这将是第二好的事情。如果ForkJoinPool我们使用ManagedBlockers,实际上可以做到这一点。

斐波那契

RecursiveTask 的 JavaDoc 中有一个使用 Fork/Join 计算斐波那契数的示例。有关经典的递归解决方案,请参见:

public static int fib(int n) {
    if (n <= 1) {
        return n;
    }
    return fib(n - 1) + fib(n - 2);
}

正如在 JavaDocs 中所解释的,这是一种计算斐波那契数的漂亮转储方法,因为该算法具有 O(2^n) 复杂度,而更简单的方法是可能的。但是这个算法非常简单易懂,所以我们坚持下去。假设我们想用 Fork/Join 来加速这个过程。一个简单的实现应该是这样的:

class Fibonacci extends RecursiveTask<Long> {
    private final long n;

    Fibonacci(long n) {
        this.n = n;
    }

    public Long compute() {
        if (n <= 1) {
            return n;
        }
        Fibonacci f1 = new Fibonacci(n - 1);
        f1.fork();
        Fibonacci f2 = new Fibonacci(n - 2);
        return f2.compute() + f1.join();
   }
}

这个任务被拆分的步骤太短了,因此这将执行得非常糟糕,但是你可以看到框架通常是如何工作的:这两个加法可以独立计算,但是我们需要它们两者来构建最终的结果。所以一半在另一个线程中完成。用线程池做同样的事情而不发生死锁(可能,但几乎没有那么简单)。

只是为了完整性:如果您真的想使用这种递归方法计算斐波那契数,这里有一个优化版本:

class FibonacciBigSubtasks extends RecursiveTask<Long> {
    private final long n;

    FibonacciBigSubtasks(long n) {
        this.n = n;
    }

    public Long compute() {
        return fib(n);
    }

    private long fib(long n) {
        if (n <= 1) {
            return 1;
        }
        if (n > 10 && getSurplusQueuedTaskCount() < 2) {
            final FibonacciBigSubtasks f1 = new FibonacciBigSubtasks(n - 1);
            final FibonacciBigSubtasks f2 = new FibonacciBigSubtasks(n - 2);
            f1.fork();
            return f2.compute() + f1.join();
        } else {
            return fib(n - 1) + fib(n - 2);
        }
    }
}

这使子任务变得更小,因为它们仅在n > 10 && getSurplusQueuedTaskCount() < 2为真时才被拆分,这意味着有超过 100 个方法调用要执行(n > 10),并且没有非常多的人任务已经在等待(getSurplusQueuedTaskCount() < 2)。

在我的计算机上(4 核(计算超线程时为 8),Intel(R) Core(TM) i7-2720QM CPU @ 2.20GHz)fib(50)使用经典方法需要 64 秒,而使用 Fork/Join 方法只需 18 秒是一个相当显着的收益,尽管没有理论上可能的那么多。

概括

  • 是的,在您的示例中,Fork/Join 与经典线程池相比没有优势。
  • 当涉及阻塞时,Fork/Join 可以显着提高性能
  • Fork/Join 规避了一些死锁问题
于 2016-05-16T15:33:54.213 回答
21

Fork/join 与线程池不同,因为它实现了工作窃取。从分叉/加入

与任何 ExecutorService 一样,fork/join 框架将任务分配给线程池中的工作线程。fork/join 框架与众不同,因为它使用了工作窃取算法。无事可做的工作线程可以从仍然忙碌的其他线程中窃取任务。

假设您有两个线程和 4 个任务 a、b、c、d,分别需要 1、1、5 和 6 秒。最初,a 和 b 分配给线程 1,c 和 d 分配给线程 2。在线程池中,这需要 11 秒。使用 fork/join,线程 1 完成并可以从线程 2 窃取工作,因此任务 d 最终将由线程 1 执行。线程 1 执行 a、b 和 d,线程 2 只是 c。总时间:8 秒,而不是 11 秒。

编辑:正如 Joonas 指出的,任务不一定预先分配给线程。fork/join 的思想是一个线程可以选择将一个任务拆分为多个子块。因此,重申上述内容:

我们有两个任务 (ab) 和 (cd),分别需要 2 秒和 11 秒。线程 1 开始执行 ab 并将其拆分为两个子任务 a & b。与线程 2 类似,它分为两个子任务 c 和 d。当线程 1 完成 a & b 后,它可以从线程 2 中窃取 d。

于 2011-10-28T09:46:46.120 回答
14

上面的每个人都是正确的,通过窃取工作来获得好处,但要扩展为什么会这样。

主要好处是工作线程之间的有效协调。工作必须拆分和重新组合,这需要协调。正如您在 AH 上面的回答中看到的那样,每个线程都有自己的工作列表。这个列表的一个重要属性是它是排序的(大任务在顶部,小任务在底部)。每个线程执行其列表底部的任务,并从其他线程列表的顶部窃取任务。

结果是:

  • 任务列表的头部和尾部可以独立同步,减少列表的争用。
  • 工作的重要子树由同一线程拆分和重组,因此这些子树不需要线程间协调。
  • 当一个线程窃取工作时,它会占用一大块,然后将其细分到自己的列表中
  • 工作钢化意味着螺纹几乎完全被利用,直到过程结束。

大多数其他使用线程池的分治方案需要更多的线程间通信和协调。

于 2012-06-22T14:11:47.843 回答
14

在此示例中,Fork/Join 没有增加任何价值,因为不需要分叉,并且工作负载在工作线程之间平均分配。Fork/Join 只会增加开销。

这是一篇关于这个主题的好文章。引用:

总的来说,我们可以说 ThreadPoolExecutor 是首选,其中工作负载在工作线程之间平均分配。为了能够保证这一点,您确实需要准确地知道输入数据的样子。相比之下,无论输入数据如何,ForkJoinPool 都提供了良好的性能,因此是一种更加稳健的解决方案。

于 2013-12-02T23:33:31.187 回答
8

另一个重要的区别似乎是,使用 FJ,您可以执行多个复杂的“加入”阶段。考虑来自http://faculty.ycp.edu/~dhovemey/spring2011/cs365/lecture/lecture18.html的合并排序,预拆分这项工作需要太多的编排。例如,您需要做以下事情:

  • 排序第一季度
  • 对第二季度进行排序
  • 合并前两个季度
  • 对第三季度进行排序
  • 对第四季度进行排序
  • 合并最后两个季度
  • 合并两半

您如何指定必须在与它们相关的合并之前进行排序等。

我一直在研究如何最好地为每个项目列表做某件事。我想我会预先拆分列表并使用标准的线程池。当工作不能预先拆分为足够多的独立任务但可以递归地拆分为彼此独立的任务时,FJ 似乎最有用(例如,排序一半是独立的,但将两个排序的一半合并为一个排序的整体不是)。

于 2012-09-05T16:23:28.590 回答
6

当您进行昂贵的合并操作时,F/J 也具有明显的优势。因为它拆分为树结构,所以您只执行 log2(n) 合并,而不是 n 合并与线性线程拆分。(这确实假设您拥有与线程一样多的处理器,但仍然是一个优势)对于家庭作业,我们必须通过对每个索引处的值求和来合并数千个二维数组(所有相同的维度)。使用 fork join 和 P 个处理器,当 P 接近无穷大时,时间接近 log2(n)。

1 2 3 .. 7 3 1 .... 8 5 4
4 5 6 + 2 4 3 => 6 9 9
7 8 9 .. 1 1 0 .... 8 9 9

于 2012-12-03T04:59:19.690 回答
3

您会对像爬虫这样的应用程序中的 ForkJoin 性能感到惊讶。这是您将学习的最佳教程。

Fork/Join 的逻辑很简单:(1)将每个大任务分离(fork)成更小的任务;(2) 在单独的线程中处理每个任务(必要时将它们分成更小的任务);(3)加入结果。

于 2015-10-19T15:27:59.037 回答
3

如果问题是我们必须等待其他线程完成(如数组排序或数组总和的情况),则应使用 fork join,因为 Executor(Executors.newFixedThreadPool(2)) 将因受限而阻塞线程数。在这种情况下,forkjoin 池将创建更多线程来掩盖阻塞线程以保持相同的并行性

来源: http ://www.oracle.com/technetwork/articles/java/fork-join-422606.html

执行器实现分治算法的问题与创建子任务无关,因为 Callable 可以自由地向其执行器提交新的子任务并以同步或异步方式等待其结果。问题在于并行性:当一个 Callable 等待另一个 Callable 的结果时,它会处于等待状态,从而浪费了处理另一个排队等待执行的 Callable 的机会。

通过 Doug Lea 的努力,在 Java SE 7 中添加到 java.util.concurrent 包中的 fork/join 框架填补了这一空白

来源: https ://docs.oracle.com/javase/7/docs/api/java/util/concurrent/ForkJoinPool.html

池尝试通过动态添加、挂起或恢复内部工作线程来维持足够的活动(或可用)线程,即使某些任务暂停等待加入其他任务也是如此。但是,面对阻塞的 IO 或其他非托管同步,不能保证这样的调整

public int getPoolSize() 返回已启动但尚未终止的工作线程数。此方法返回的结果可能与 getParallelism() 不同,当创建线程以在其他线程被协作阻塞时保持并行性。

于 2016-04-20T23:33:56.833 回答
3

我想为那些没有太多时间阅读长答案的人添加一个简短的答案。比较取自《Applied Akka Patterns》一书:

您决定是使用 fork-join-executor 还是 thread-pool-executor 很大程度上取决于该调度程序中的操作是否会阻塞。fork-join-executor 为您提供最大数量的活动线程,而 thread-pool-executor 为您提供固定数量的线程。如果线程被阻塞,fork-join-executor 将创建更多线程,而 thread-pool-executor 不会。对于阻塞操作,通常最好使用线程池执行器,因为它可以防止线程数爆炸。更多“反应性”操作在 fork-join-executor 中更好。

于 2019-06-24T16:57:16.787 回答