5

我正在阅读 Java ForkJoin 框架。不直接调用(例如)invoke()的实现,而是实例化和调用,有什么额外的好处?当我们调用这 2 个方法时,到底会发生什么?ForkJoinTaskRecursiveTaskForkJoinPoolpool.invoke(task)invoke

从源头上看,如果recursiveTask.invoke被调用,它会以托管线程池的方式调用它的execand 最终compute。因此,为什么我们有成语更加令人困惑pool.invoke(task)

我写了一些简单的代码来测试性能差异,但我没有看到任何东西。也许测试代码是错误的?见下文:

public class MyForkJoinTask extends RecursiveAction {

    private static int totalWorkInMillis = 20000;
    protected static int sThreshold = 1000;

    private int workInMillis;


    public MyForkJoinTask(int work) {
        this.workInMillis = work;
    }

    // Average pixels from source, write results into destination.
    protected void computeDirectly() {
        try {

            ForkJoinTask<Object> objectForkJoinTask = new ForkJoinTask<>();
            Thread.sleep(workInMillis);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }

    @Override
    protected void compute() {
        if (workInMillis < sThreshold) {
            computeDirectly();
            return;
        }

        int discountedWork = (int) (workInMillis * 0.9);
        int split = discountedWork / 2;

        invokeAll(new MyForkJoinTask(split),
                new MyForkJoinTask(split));
    }

    public static void main(String[] args) throws Exception {
        System.out.printf("Total work is %d in millis.%n", totalWorkInMillis);
        System.out.printf("Threshold is %d in millis.%n", sThreshold);

        int processors = Runtime.getRuntime().availableProcessors();
        System.out.println(Integer.toString(processors) + " processor"
                + (processors != 1 ? "s are " : " is ")
                + "available");

        MyForkJoinTask fb = new MyForkJoinTask(totalWorkInMillis);

        ForkJoinPool pool = new ForkJoinPool();

        long startTime = System.currentTimeMillis();


        // These 2 seems no difference!
        pool.invoke(fb);
//        fb.compute();


        long endTime = System.currentTimeMillis();

        System.out.println("Took " + (endTime - startTime) +
                " milliseconds.");
    }
}
4

1 回答 1

7

类的compute()方法RecursiveTask只是一个包含任务代码的抽象方法。它不使用池中的新线程,如果您正常调用它,它不会在池管理的线程中运行。

invokefork join 池上的方法将任务提交到池中,然后该池开始在单独的线程上运行,调用该compute线程上的方法,然后等待结果。

您可以在 Java 文档中 RecursiveTask 和 ForkJoinPool 的措辞中看到这一点。该invoke()方法实际上执行任务,而该compute()方法只是封装了计算。

protected abstract V compute()

此任务执行的主要计算。

和 ForkJoinPool

public <T> T invoke(ForkJoinTask<T> task)

执行给定的任务,完成后返回其结果。...

因此,使用计算方法,您正在做的是compute在分叉连接池之外运行第一次调用。您可以通过在计算方法中添加日志行来测试这一点。

System.out.println(this.inForkJoinPool());

您还可以通过记录线程 ID 来检查它是否在同一线程中运行

System.out.println(Thread.currentThread().getId());

一旦您调用invokeAll,该调用中包含的子任务就会在池中运行。但请注意,它不一定在您调用之前创建的池中运行compute()。你可以注释掉你的new ForkJoinPool()代码,它仍然会运行。有趣的是,java 7 文档说invokeAll()如果在池管理线程之外调用该方法将引发异常,但 java 8 文档没有。我还没有在 java 7 中测试过它(只有 8 个)。compute()但很有可能,您的代码在 java 7 中直接调用时会抛出异常。

两个结果返回相同时间的原因是毫秒不够准确,无法记录在池管理线程中启动第一个线程或仅compute在现有线程中运行第一个调用的差异。

Sierra 和 Bates 的 OCA/OCP 学习指南建议您使用 fork join 框架的方式是invoke()从池中调用。它明确了您正在使用哪个池,也意味着您可以将多个任务提交到同一个池,从而节省了每次重新创建新池的开销。从逻辑上讲,将所有任务计算保留在池管理线程中也更干净(或者至少我认为是这样)。

pool.invoke()在特定池上调用调用;而不是让框架在第一次调用task.invoke或时创建一个。task.invokeAll这意味着您可以将池重新用于新任务,并在创建池时指定活动线程数等内容。这就是区别。将这些日志行添加到您的代码中,使用它,您会看到它在做什么

于 2015-12-07T23:39:04.910 回答