如果我有 2 个 CPU 并为 fork / join 框架安排 1000 个任务来处理,这些任务一次最多执行 2 个,还是会在同一个 CPU 上并行执行更多任务?(比如说,也许一个任务正在等待 I/O,在这种情况下 CPU 会变得空闲,另一个线程可以运行)
5 回答
如果您自己不包含任何限制,则不会应用任何限制,Java 将分叉尽可能多的线程(可能全部 1000 个,具体取决于系统限制)。这并不理想。如果您正在执行的计算可能需要一些 IO 时间,但即使在大量并发处理时也不受 IO 限制,那么您可能能够证明比可用 CPU 数量多运行一个线程是合理的。一次运行所有 1000 个是不明智的。
如果我有 2 个 CPU 并为 fork / join 框架安排 1000 个任务来处理,这些任务一次最多执行 2 个,还是会在同一个 CPU 上并行执行更多任务?
如果你有一个双核 CPU,你实际上一次只能执行 2 个线程。
根据ForkJoin 文档:
ForkJoinPool 是用给定的目标并行级别构造的; 默认情况下,等于可用处理器的数量。池尝试通过动态添加、挂起或恢复内部工作线程来维持足够的活动(或可用)线程,即使某些任务暂停等待加入其他任务也是如此。但是,面对阻塞的 IO 或其他非托管同步,不能保证这样的调整。
因此,它可能会在您的 2 个 CPU 上一次运行两个,如果 CPU 是超线程的,则可能一次运行四个(我不确定)。如果您对默认的并行级别不满意,您可以通过调用将并行级别作为参数的 ForkJoinPool 构造函数来指定请求的并行级别。
cpu上是否启用了超线程?如果是这样,您可以同时运行 2 个以上的进程。
超线程的工作原理是复制处理器的某些部分——那些存储架构状态的部分——但不复制主要的执行资源。这允许超线程处理器对主机操作系统表现为两个“逻辑”处理器,从而允许操作系统同时调度两个线程或进程。
我做了一个测试来验证这一点:
import java.util.concurrent.*;
public class Test {
private static class TestAction extends RecursiveAction {
private int i;
public TestAction(int i) {
this.i = i;
}
protected void compute() {
if (i == 0) {
invokeAll(new TestAction(1), new TestAction(2), new TestAction(3),
new TestAction(4), new TestAction(5), new TestAction(6));
return;
}
System.out.println(i + " start");
try { Thread.sleep(2000); } catch (Exception e) { }
System.out.println(i + " end");
}
}
public static void main(String[] args) {
new ForkJoinPool().invoke(new TestAction(0));
}
}
使用参考 Oracle 实现运行的结果是:
1 start
6 start <- wait 2 seconds
1 end
2 start
6 end
5 start <- wait 2 seconds
2 end
3 start
5 end
4 start <- wait 2 seconds
4 end
3 end
Linux 和 Mac OS X 上的相同行为是一致的。
所以问题的答案是:是的,任务将在并行参数指定的 CPU 数量(或默认情况下的可用 CPU 总数)上执行。如果 CPU 时间可用并且任务只是阻塞等待某事,那么框架将不会自动执行任何操作来运行其他任务。
由于到目前为止我看到的文档对于如果 CPU 空闲,框架应该做什么非常模糊,这可能是一个实现细节。
默认情况下,Fork/Join 框架尝试保持线程数等于内核数减一(如果是单核机器,则创建一个线程)。makeCommonPool
您可以在类的方法中看到此代码ForkJoinPool
。
如果您认为这未充分利用您的 CPU,您可以为parallelism
.
但最有趣的是,有一种方法可以让 ForkJoinPool 在当前线程占用 CPU 阻塞 IO 时创建更多线程。block
您所要做的就是在对象方法的实现中实现实际上阻塞在 IO 上的代码块ForkJoinPool.ManagedBlocker
,并将该ManagedBlocker
对象传递给类的managedBlock
方法ForkJoinPool
。完成后,ForkJoinPool
检查调用此方法的当前线程是否是 a 的实例ForkJoinPoolWorkerThread
。如果是,则ForkjoinPool
通过创建可以接管 CPU 的新线程来进行补偿。
ForkJoinPool fjp = ForkJoinPool.common();
Runnable task = new Runnable(){
public void run(){
//Some cpu-intensive code
ForkJoinPool.managedBlock(new ForkJoinPool.ManagedBlocker(){
public boolean isReleasable(){
//return true if an IO/blocking operation is to be done.
}
public boolean block(){
//Do an IO Operation here
//return true if all blocking code has finished execution.
//return false if more blocking code is yet to execute.
}
});
//Some more CPU intensive code here
}
};
fjp.submit(task);