5

当队列已满时,我需要阻止 ForkJoinPool 上的线程。这可以在标准的 ThreadPoolExecutor 中完成,例如:

private static ExecutorService newFixedThreadPoolWithQueueSize(int nThreads, int queueSize) {
    return new ThreadPoolExecutor(nThreads, nThreads,
            5000L, TimeUnit.MILLISECONDS,
            new ArrayBlockingQueue<Runnable>(queueSize, true), new ThreadPoolExecutor.CallerRunsPolicy());
}

我知道,ForkJoinPool 中有一些 Dequeue,但我无法通过它的 API 访问它。

更新:请看下面的答案。

4

1 回答 1

11

经过一番研究,我很高兴回答这个问题:

原因: 由于以下原因,ForkJoinPool 的实现中没有这样的选项。大多数 juc Executors 假设单个并发队列和许多线程。当多个线程读取/写入队列时,这会导致队列争用并降低性能。因此,这种方法的可扩展性不高 --> 队列上的高竞争会产生大量的上下文切换和 CPU 业务。

实现:在 ForkJoinPool 中,每个线程都有一个由数组支持 的单独的双端队列(Deque )。为了尽量减少争用,工作窃取发生在双端队列的尾部,而任务提交发生在当前线程(工作线程)的头部。尾部包含最大部分的工作。换句话说,另一个工作线程从尾部窃取可以最小化与其他工作线程交互的次数 --> 更少的争用,更好的整体性能。

解决方法: 有全局提交队列。来自非 FJ 线程的提交进入提交队列(Worker 接受这些任务)。还有上面提到的工人队列。

队列的最大大小受数量限制:

   /**
     * Maximum size for queue arrays. Must be a power of two less
     * than or equal to 1 << (31 - width of array entry) to ensure
     * lack of wraparound of index calculations, but defined to a
     * value a bit less than this to help users trap runaway
     * programs before saturating systems.
     */
    static final int MAXIMUM_QUEUE_CAPACITY = 1 << 26; // 64M

当队列已满时,会抛出未经检查的异常:

RejectedExecutionException("Queue capacity exceeded")

这在 javadocs 中有描述。

(另请参阅 ThreadPool 的构造函数UncaughtExceptionHandler

我倾向于声称当前的实现没有这样的机制,这应该由我们在消费 API 中实现。

例如,这可以按如下方式完成:

  1. 实现指数回退逻辑,通过增加下一次重试的时间间隔来尝试定期重新提交任务。或者..
  2. 编写一个节流器,定期检查 submitQueue 的大小(请参阅 参考资料ForkJoinPool.getQueuedSubmissionCount())。

是 ForkJoinPool 的官方 JSR-166E java 代码以获取更多信息。

于 2015-05-05T06:22:09.230 回答