在花了几个小时查看 ForkJoinPool 和 ForkJoinTask 的源代码之后,我发现了以下内容:
如果满足以下两个条件之一,join() 将导致线程查看和窃取任务:
正在加入的任务位于当前工作线程的工作队列的顶部,在这种情况下工作线程将继续执行该任务(见下文)
另一个工作线程的工作队列中有任务,但只有当该工作线程从当前工作线程中窃取了一个任务时,当前工作线程才会偷回一个任务并执行它(见下文)
对于第一种情况,我主要从doJoin()
ForkJoinTask.java 中的方法推断出来,下面是一个说明该情况的工作测试:
public static void main(String[] args){
ForkJoinPool pool = new ForkJoinPool(2);
ForkJoinTask task3 = ForkJoinTask.adapt(() -> {
System.out.println("task 3 executing on thread " + Thread.currentThread());
for(int i = 0; i < 10; ++i){
System.out.println("task 3 doing work " + i);
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
});
ForkJoinTask task2 = ForkJoinTask.adapt(() -> {
try {
System.out.println("task 2 executing on thread " + Thread.currentThread());
Thread.sleep(5000);
System.out.println("task 2 finished");
} catch (InterruptedException e) {
e.printStackTrace();
}
return null;
});
ForkJoinTask task1 = ForkJoinTask.adapt(() -> {
System.out.println("task 1 executing on thread " + Thread.currentThread());
pool.submit(task3);
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("task 1 joining task 3");
task3.join();
System.out.println("task 1 finished");
});
pool.submit(task2);
pool.submit(task1);
task1.join();
}
输出是
task 1 executing on thread Thread[ForkJoinPool-1-worker-2,5,main]
task 2 executing on thread Thread[ForkJoinPool-1-worker-1,5,main]
task 1 joining task 3
task 3 executing on thread Thread[ForkJoinPool-1-worker-2,5,main]
task 3 doing work 0
task 3 doing work 1
task 3 doing work 2
task 3 doing work 3
task 2 finished
task 3 doing work 4
task 3 doing work 5
task 3 doing work 6
task 3 doing work 7
task 3 doing work 8
task 3 doing work 9
task 1 finished
Task3 和 task1 在同一个工作线程上执行,这是意料之中的,因为 task3 直接提交到 thread2 的工作队列,因此根据案例 1,它应该在 task1 调用 join() 时执行。
我根据 ForkJoinPool.java 中的方法推导出了第二种情况awaitJoin()
,下面是一个说明该情况的工作测试
public static void main(String[] args){
ForkJoinPool pool = new ForkJoinPool(2);
ForkJoinTask task3 = ForkJoinTask.adapt(() -> {
System.out.println("task 3 executing on thread " + Thread.currentThread());
for(int i = 0; i < 10; ++i){
System.out.println("task 3 doing work " + i);
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
});
ForkJoinTask task2 = ForkJoinTask.adapt(() -> {
try {
System.out.println("task 2 executing on thread " + Thread.currentThread());
pool.submit(task3);
Thread.sleep(5000);
System.out.println("task 2 finished");
} catch (InterruptedException e) {
e.printStackTrace();
}
return null;
});
ForkJoinTask task1 = ForkJoinTask.adapt(() -> {
System.out.println("task 1 executing on thread " + Thread.currentThread());
pool.submit(task2);
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("task 1 joining task 2");
task2.join();
System.out.println("task 1 finished");
});
pool.submit(task1);
task1.join();
task2.join();
task3.join();
}
和输出
task 1 executing on thread Thread[ForkJoinPool-1-worker-1,5,main]
task 2 executing on thread Thread[ForkJoinPool-1-worker-2,5,main]
task 1 joining task 2
task 3 executing on thread Thread[ForkJoinPool-1-worker-1,5,main]
task 3 doing work 0
task 3 doing work 1
task 3 doing work 2
task 3 doing work 3
task 2 finished
task 3 doing work 4
task 3 doing work 5
task 3 doing work 6
task 3 doing work 7
task 3 doing work 8
task 3 doing work 9
task 1 finished
Task3 在 thread1 上执行,因为 task1 正在等待 task2,这是可能的,因为 task2 已提交到 thread1 的工作队列,但由于 thread2 是空闲的,它窃取了任务可以成为 thread1 的窃取者。当 thread1 看到来自 task1 的 join() 调用时,它会查看窃取者 (thread2) 的工作队列并找到 task3,将其拿走并执行。
另请注意,task1 仅在 task3 之后才完成执行,这意味着一旦线程窃取了任务,它必须执行它直到完成。
现在对于原始问题,我已经在非 ForkJoinWorkerThread(主线程)中提交了 task1 和 task2,因此没有工作线程相互窃取,因此第二种情况不适用。此外,由于我在 thread2 的工作队列中的第二个任务上调用了 join(),因此第一种情况不适用,因此不会发生窃取。
编辑:这绝不是java中F/J的答案,如果有任何问题请指出。事实上,挖掘所有这些细节只会产生更多的问题:也就是说,为什么工作线程不接受任意任务并运行它?为什么它必须来自窃取者或它自己的工作队列?如果您有答案,请发表评论/发布。