3

在我的应用程序中,我以不同的速率向 commonPool 提交两种任务。

任务1:

ForkJoinPool.managedBlock(
//...
Uninterruptibles.putUninterruptibly(blockingQueueQithMaxSize50, "a")
//...
);

任务2:

List<String> list = Lists.newLinkedList();
ForkJoinPool.managedBlock(
//...
Queues.drainUninterruptibly(blockingQueueWithMaxSize50, list, 1, 1, SECONDS);
//...
);

在某些场景下,当task-1类型的任务提交到池中的速率太高,blockingQueue 已满时,所有运行task-1类型任务的线程都在 put 上阻塞(线程数大约是 52)。但是仍在提交到池中的task-1task-2类型的新任务不会导致池中产生新的工作人员,这会导致所有后续任务在工作队列中排队,从而导致饥饿和死锁导致应用程序冻结。

有人可以帮我理解我在这里做错了什么吗?

经过一番挖掘,我发现了这些:

但看起来这个错误已在 java 7 本身中修复。

环境:

  • JDK:oracle-j2sdk1.8 | 1.8.0+更新20
  • 拱门:amd64
  • 操作系统:Debian Wheezy

常用池配置:

  • 可用处理器 = 2
  • 这意味着并行度 = 1
  • 所有其他配置都是默认的

更新 1

更多信息:

submit将 runnables 设置为ForkJoinPoolwhich 内部调用externalPush方法:

final void externalPush(ForkJoinTask<?> task) {
    WorkQueue q; int m, s, n, am; ForkJoinTask<?>[] a;
    int r = ThreadLocalRandom.getProbe();
    int ps = plock;
    WorkQueue[] ws = workQueues;
    if (ps > 0 && ws != null && (m = (ws.length - 1)) >= 0 &&
        (q = ws[m & r & SQMASK]) != null && r != 0 &&
        U.compareAndSwapInt(q, QLOCK, 0, 1)) { // lock
        if ((a = q.array) != null &&
            (am = a.length - 1) > (n = (s = q.top) - q.base)) {
            int j = ((am & s) << ASHIFT) + ABASE;
            U.putOrderedObject(a, j, task);
            q.top = s + 1;                     // push on to deque
            q.qlock = 0;
            if (n <= 1)
                signalWork(ws, q);
            return;
        }
        q.qlock = 0;
    }
    fullExternalPush(task);
}

当我进行远程调试时,执行达到

q.top = s + 1; // push on to deque
q.qlock = 0;
if (n <= 1)
    signalWork(ws, q);
return;

但是n非常大,不小于或等于1,因此signalWork不会调用内部调用方法的tryAddWorker方法。

4

0 回答 0