所以,事实证明我错了。
当您将 a 配置ForkJoinPool
为parallelism
1 时,只有一个线程执行任务。main
线程在 上被阻塞ForkJoin.get()
。它实际上不执行任何任务。
也就是说,事实证明,提供确定性行为确实很棘手。以下是我必须纠正的一些问题:
ForkJoinPool
如果工作线程空闲时间足够长,则正在使用不同的工作线程(具有不同的名称)执行任务。例如,如果主线程在调试断点处挂起,则工作线程将变得空闲并关闭。当我恢复执行时,ForkJoinThread
会启动一个具有不同名称的新工作线程。为了解决这个问题,我必须提供一个自定义ForkJoinWorkerThreadFactory
实现,null
如果ForkJoinPool
已经有一个活的工作人员(这可以防止池创建多个工作人员),它就会返回。Random
即使工作线程关闭并再次返回,我也确保我的代码返回相同的实例。
- 具有不确定迭代顺序的集合,例如
HashMap
或HashSet
导致元素在每次运行时以不同的顺序抓取随机数。我通过使用LinkedHashMap
和纠正了这个问题LinkedHashSet
。
- 具有非确定性 hashCode() 实现的对象,例如
Enum.hashCode()
. 我忘记了这导致了什么问题,但我通过自己计算 hashCode() 而不是依赖内置方法来纠正它。
这是 ForkJoinWorkerThreadFactory 的示例实现:
class MyForkJoinWorkerThread extends ForkJoinWorkerThread
{
MyForkJoinWorkerThread(ForkJoinPool pool)
{
super(pool);
// Change thread name after ForkJoinPool.registerWorker() does the same
setName("DETERMINISTIC_WORKER");
}
}
ForkJoinWorkerThreadFactory factory = new ForkJoinWorkerThreadFactory()
{
private WeakReference<Thread> currentWorker = new WeakReference<>(null);
@Override
public synchronized ForkJoinWorkerThread newThread(ForkJoinPool pool)
{
// If the pool already has a live thread, wait for it to shut down.
Thread thread = currentWorker.get();
if (thread != null && thread.isAlive())
{
try
{
thread.join();
}
catch (InterruptedException e)
{
log.error("", e);
}
}
ForkJoinWorkerThread result = new MyForkJoinWorkerThread(pool);
currentWorker = new WeakReference<>(result);
return result;
}
};