2

我正在尝试对一些大数据实施分而治之的解决方案。我使用 fork 和 join 将事物分解为线程。但是我有一个关于分叉机制的问题:如果我将分而治之的条件设置为:

@Override
protected SomeClass compute(){
    if (list.size()<LIMIT){
        //Do something here
        ...
    }else{
        //Divide the list and invoke sub-threads
        SomeRecursiveTaskClass subWorker1 = new SomeRecursiveTaskClass(list.subList());
        SomeRecursiveTaskClass subWorker2 = new SomeRecursiveTaskClass(list.subList());
        invokeAll(subWorker1, subWorker2);
        ...
    }
}

如果没有足够的资源来调用subWorker(例如池中没有足够的线程)会发生什么?Fork/Join 框架是否为可用线程维护池大小?还是应该将此条件添加到我的分治逻辑中?

4

1 回答 1

4

每个ForkJoinPool都有一个配置的目标并行度。这与线程数不完全匹配,即如果工作线程将通过 a 等待ManagedBlocker,则池可能会启动更多线程来进行补偿。默认的并行度commonPool为“CPU 内核数减一”,因此当将启动的非池线程合并为帮助程序时,生成的并行度将使用所有 CPU 内核。

当您提交的作业多于线程时,它们将被排队。将一些作业排入队列可以帮助利用线程,因为并非所有作业都可能在完全相同的时间运行,所以线程耗尽工作可能会从其他线程中窃取作业,但是过多地拆分工作可能会产生不必要的开销。

因此,您可以使用ForkJoinTask.getSurplusQueuedTaskCount()获取当前待处理作业的数量,这些作业不太可能被其他线程窃取,并且仅在低于一个小阈值时才拆分。正如其文档所述:

该值对于是否分叉其他任务的启发式决策可能很有用。在 ForkJoinTasks 的许多用法中,在稳定状态下,每个工作人员应该致力于保持少量的恒定剩余任务(例如,3 个),并在超过此阈值时在本地处理计算。

So this is the condition to decide whether to split your jobs further. Since this number reflects when idle threads steal your created jobs, it will cause balancing when the jobs have different CPU load. Also, it works the other way round, if the pool is shared (like the common pool) and threads are already busy, they will not pick up your jobs, the surplus count will stay high and you will automatically stop splitting then.

于 2018-01-09T18:26:32.513 回答