2

只要我在池中给 ForkJoinPool一个额外的线程,它的执行速度就与 ExecutorService 一样快。以下是使用的三个类:Main、RunnableTask 和 ForkJoinTask。在 16 核盒子上运行,程序每次输出如下: Executor Time: 5002 ForkJoin Time: 5002

主类:

import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ForkJoinPool;

public class Main {

    public static void main(String[] args) throws InterruptedException {
        runExecutor(80);
        runForkJoin(80);
    }

    public static void runForkJoin(int size) {
        ForkJoinPool fjp = new ForkJoinPool(17);
        long start = System.currentTimeMillis();
        fjp.invoke(new ForkJoinTask(size));
        System.out.println("ForkJoin Time: "
                + (System.currentTimeMillis() - start));
        fjp.shutdown();
    }

    public static void runExecutor(int size) throws InterruptedException {
        ExecutorService exec = Executors.newFixedThreadPool(16);
        CountDownLatch latch = new CountDownLatch(size);
        long start = System.currentTimeMillis();
        for (int i = 0; i < latch.getCount(); i++) {
            exec.submit(new RunnableTask(latch));
        }
        latch.await();
        System.out.println("Executor Time: "
                + (System.currentTimeMillis() - start));
        exec.shutdown();
    }
}

可运行类:

import java.util.concurrent.CountDownLatch;

public class RunnableTask implements Runnable {
    private CountDownLatch latch;

    public RunnableTask(CountDownLatch latch) {
        this.latch = latch;
    }

    @Override
    public void run() {
        try {
            Thread.sleep(1000);
            latch.countDown();
        } catch (Exception e) {
        }
    }
}

递归任务类:

import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.RecursiveTask;

public class ForkJoinTask extends RecursiveTask {
    private List<RecursiveTask> tasks;
    private int size;

    public ForkJoinTask(int size) {
        super();
        this.tasks = new ArrayList<>();
        this.size = size;
    }

    @Override
    protected Object compute() {
        for (int i = 0; i < size; i++) {
            RecursiveTask task = new RecursiveTask() {
                @Override
                protected Object compute() {
                    try {
                        Thread.sleep(1000);
                    } catch (Exception e) {

                    }
                    return null;
                }
            };
            task.fork();
            tasks.add(task);
        }

        for (RecursiveTask task : tasks) {
            task.join();
        }
        return null;
    }
}
4

1 回答 1

3

您的个人任务使得两者都 ForkJoinPool可以ExecutorService比现在运行得更快,而且任何一个都不应该比另一个拥有实质性优势。

原因是如果单个计算任务是,Thread.sleep(1000)那么该任务不需要 CPU 资源。您可以增加线程数以匹配您的作业大小 (80),并在 1 秒多一点的时间内完成 80 秒的“工作”,因为线程不会真正竞争任何类型的资源。

至于 和 之间的比较,差异ForkJoinPoolExecutorService您的测试用例无关,因为您的工作不会导致任何应该作为进一步计算输入的内容(MapReduce中的“减少”步骤)。所以对你来说,它们都是具有不同 API 的线程池。

于 2013-03-27T19:32:07.937 回答