-1

我有以下测试代码

public static void main(String[] args){
    ForkJoinPool pool = new ForkJoinPool(2);

    ForkJoinTask task3 = ForkJoinTask.adapt(() -> {
        System.out.println("task 3 executing");
        for(int i = 0; i < 10; ++i){
            System.out.println("task 3 doing work " + i);
            try {
                Thread.sleep(1000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    });

    ForkJoinTask task2 = ForkJoinTask.adapt(() -> {
        try {
            System.out.println("task 2 executing");
            Thread.sleep(5000);
            System.out.println("task 2 finishing");
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        return null;
    });

    pool.submit(task2);

    ForkJoinTask task1 = pool.submit(() -> {
        System.out.println("task 1 executing");
        pool.submit(task3); // EDIT: Original code was task3.fork();
        System.out.println("task 1 joining task 2");
        task2.join();
        System.out.println("task 1 finished");
    });

    task1.join();
}

它基本上将 3 个任务提交给并行度为 2 的 ForkJoinPool,任务 2 和 3 长时间运行,任务 1 等待任务 2。

标记 2 个线程 t1 和 t2,其中 t1 执行 task1,t2 执行 task2。

在我的理解中,工作窃取魔法发生在 join() 调用中,调用线程将从自己的工作队列或其他工作线程的工作队列中执行任务。因此,我期望 t1 执行 task1,看到 join() 调用然后窃取 task3 并执行它以完成。

然而,在实践中,t1 并没有对 join() 调用做任何特别的事情。Task3 仅在 task1 和 task2 都完成后执行。为什么会这样?

4

1 回答 1

0

在花了几个小时查看 ForkJoinPool 和 ForkJoinTask 的源代码之后,我发现了以下内容:

如果满足以下两个条件之一,join() 将导致线程查看和窃取任务:

  1. 正在加入的任务位于当前工作线程的工作队列的顶部,在这种情况下工作线程将继续执行该任务(见下文)

  2. 另一个工作线程的工作队列中有任务,但只有当该工作线程从当前工作线程中窃取了一个任务时,当前工作线程才会偷回一个任务并执行它(见下文)

对于第一种情况,我主要从doJoin()ForkJoinTask.java 中的方法推断出来,下面是一个说明该情况的工作测试:

public static void main(String[] args){
    ForkJoinPool pool = new ForkJoinPool(2);

    ForkJoinTask task3 = ForkJoinTask.adapt(() -> {
        System.out.println("task 3 executing on thread " + Thread.currentThread());
        for(int i = 0; i < 10; ++i){
            System.out.println("task 3 doing work " + i);
            try {
                Thread.sleep(1000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    });

    ForkJoinTask task2 = ForkJoinTask.adapt(() -> {
        try {
            System.out.println("task 2 executing on thread " + Thread.currentThread());
            Thread.sleep(5000);
            System.out.println("task 2 finished");
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        return null;
    });

    ForkJoinTask task1 = ForkJoinTask.adapt(() -> {
        System.out.println("task 1 executing on thread " + Thread.currentThread());
        pool.submit(task3);
        try {
            Thread.sleep(1000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        System.out.println("task 1 joining task 3");
        task3.join();
        System.out.println("task 1 finished");
    });

    pool.submit(task2);
    pool.submit(task1);

    task1.join();
}

输出是

task 1 executing on thread Thread[ForkJoinPool-1-worker-2,5,main]
task 2 executing on thread Thread[ForkJoinPool-1-worker-1,5,main]
task 1 joining task 3
task 3 executing on thread Thread[ForkJoinPool-1-worker-2,5,main]
task 3 doing work 0
task 3 doing work 1
task 3 doing work 2
task 3 doing work 3
task 2 finished
task 3 doing work 4
task 3 doing work 5
task 3 doing work 6
task 3 doing work 7
task 3 doing work 8
task 3 doing work 9
task 1 finished

Task3 和 task1 在同一个工作线程上执行,这是意料之中的,因为 task3 直接提交到 thread2 的工作队列,因此根据案例 1,它应该在 task1 调用 join() 时执行。

我根据 ForkJoinPool.java 中的方法推导出了第二种情况awaitJoin(),下面是一个说明该情况的工作测试

public static void main(String[] args){
    ForkJoinPool pool = new ForkJoinPool(2);

    ForkJoinTask task3 = ForkJoinTask.adapt(() -> {
        System.out.println("task 3 executing on thread " + Thread.currentThread());
        for(int i = 0; i < 10; ++i){
            System.out.println("task 3 doing work " + i);
            try {
                Thread.sleep(1000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    });

    ForkJoinTask task2 = ForkJoinTask.adapt(() -> {
        try {
            System.out.println("task 2 executing on thread " + Thread.currentThread());
            pool.submit(task3);
            Thread.sleep(5000);
            System.out.println("task 2 finished");
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        return null;
    });

    ForkJoinTask task1 = ForkJoinTask.adapt(() -> {
        System.out.println("task 1 executing on thread " + Thread.currentThread());
        pool.submit(task2);
        try {
            Thread.sleep(1000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        System.out.println("task 1 joining task 2");
        task2.join();
        System.out.println("task 1 finished");
    });

    pool.submit(task1);

    task1.join();
    task2.join();
    task3.join();
}

和输出

task 1 executing on thread Thread[ForkJoinPool-1-worker-1,5,main]
task 2 executing on thread Thread[ForkJoinPool-1-worker-2,5,main]
task 1 joining task 2
task 3 executing on thread Thread[ForkJoinPool-1-worker-1,5,main]
task 3 doing work 0
task 3 doing work 1
task 3 doing work 2
task 3 doing work 3
task 2 finished
task 3 doing work 4
task 3 doing work 5
task 3 doing work 6
task 3 doing work 7
task 3 doing work 8
task 3 doing work 9
task 1 finished

Task3 在 thread1 上执行,因为 task1 正在等待 task2,这是可能的,因为 task2 已提交到 thread1 的工作队列,但由于 thread2 是空闲的,它窃取了任务可以成为 thread1 的窃取者。当 thread1 看到来自 task1 的 join() 调用时,它会查看窃取者 (thread2) 的工作队列并找到 task3,将其拿走并执行。

另请注意,task1 仅在 task3 之后才完成执行,这意味着一旦线程窃取了任务,它必须执行它直到完成。

现在对于原始问题,我已经在非 ForkJoinWorkerThread(主线程)中提交了 task1 和 task2,因此没有工作线程相互窃取,因此第二种情况不适用。此外,由于我在 thread2 的工作队列中的第二个任务上调用了 join(),因此第一种情况不适用,因此不会发生窃取。

编辑:这绝不是java中F/J的答案,如果有任何问题请指出。事实上,挖掘所有这些细节只会产生更多的问题:也就是说,为什么工作线程不接受任意任务并运行它?为什么它必须来自窃取者或它自己的工作队列?如果您有答案,请发表评论/发布。

于 2018-04-22T22:09:47.550 回答