在我的应用程序中,我以不同的速率向 commonPool 提交两种任务。
任务1:
ForkJoinPool.managedBlock(
//...
Uninterruptibles.putUninterruptibly(blockingQueueQithMaxSize50, "a")
//...
);
任务2:
List<String> list = Lists.newLinkedList();
ForkJoinPool.managedBlock(
//...
Queues.drainUninterruptibly(blockingQueueWithMaxSize50, list, 1, 1, SECONDS);
//...
);
在某些场景下,当task-1类型的任务提交到池中的速率太高,blockingQueue 已满时,所有运行task-1类型任务的线程都在 put 上阻塞(线程数大约是 52)。但是仍在提交到池中的task-1和task-2类型的新任务不会导致池中产生新的工作人员,这会导致所有后续任务在工作队列中排队,从而导致饥饿和死锁导致应用程序冻结。
有人可以帮我理解我在这里做错了什么吗?
经过一番挖掘,我发现了这些:
但看起来这个错误已在 java 7 本身中修复。
环境:
- JDK:oracle-j2sdk1.8 | 1.8.0+更新20
- 拱门:amd64
- 操作系统:Debian Wheezy
常用池配置:
- 可用处理器 = 2
- 这意味着并行度 = 1
- 所有其他配置都是默认的
更新 1
更多信息:
我submit
将 runnables 设置为ForkJoinPool
which 内部调用externalPush
方法:
final void externalPush(ForkJoinTask<?> task) {
WorkQueue q; int m, s, n, am; ForkJoinTask<?>[] a;
int r = ThreadLocalRandom.getProbe();
int ps = plock;
WorkQueue[] ws = workQueues;
if (ps > 0 && ws != null && (m = (ws.length - 1)) >= 0 &&
(q = ws[m & r & SQMASK]) != null && r != 0 &&
U.compareAndSwapInt(q, QLOCK, 0, 1)) { // lock
if ((a = q.array) != null &&
(am = a.length - 1) > (n = (s = q.top) - q.base)) {
int j = ((am & s) << ASHIFT) + ABASE;
U.putOrderedObject(a, j, task);
q.top = s + 1; // push on to deque
q.qlock = 0;
if (n <= 1)
signalWork(ws, q);
return;
}
q.qlock = 0;
}
fullExternalPush(task);
}
当我进行远程调试时,执行达到
q.top = s + 1; // push on to deque
q.qlock = 0;
if (n <= 1)
signalWork(ws, q);
return;
但是n非常大,不小于或等于1,因此signalWork
不会调用内部调用方法的tryAddWorker
方法。