是否有与 Java 8 结合使用的待处理任务队列Executors.newWorkStealingPool()
?
例如,假设 #available cores 是 2,并且Executors.newWorkStealingPool()
是空的,因为 2 个任务已经在运行。那么如果将第三个任务提交给工作窃取执行者会发生什么?是排队吗?如果是,那么所述队列上的界限是什么?
提前致谢。
是否有与 Java 8 结合使用的待处理任务队列Executors.newWorkStealingPool()
?
例如,假设 #available cores 是 2,并且Executors.newWorkStealingPool()
是空的,因为 2 个任务已经在运行。那么如果将第三个任务提交给工作窃取执行者会发生什么?是排队吗?如果是,那么所述队列上的界限是什么?
提前致谢。
是否有与 Java 8 的 Executors.newWorkStealingPool() 一起使用的待处理任务队列?
是的,每个线程都有自己的双端队列支持。当一个线程完成它的任务时,它会从其他线程的双端队列中获取任务并执行它。
如果是,那么所述队列上的界限是什么?
队列的最大大小受数量限制:static final int MAXIMUM_QUEUE_CAPACITY = 1 << 26; // 64M
当队列已满时,会抛出未经检查的异常:RejectedExecutionException("Queue capacity exceeded")
来自Executors和ForkJoinPool的 grepcode
Executors
. newWorkStealingPool
返回ForkJoinPool
执行者:
public static ExecutorService newWorkStealingPool() {
return new ForkJoinPool
(Runtime.getRuntime().availableProcessors(),
ForkJoinPool.defaultForkJoinWorkerThreadFactory,
null, true);
}
ForkJoinPool:
public ForkJoinPool(int parallelism,
ForkJoinWorkerThreadFactory factory,
UncaughtExceptionHandler handler,
boolean asyncMode) {
this(checkParallelism(parallelism),
checkFactory(factory),
handler,
asyncMode ? FIFO_QUEUE : LIFO_QUEUE,
"ForkJoinPool-" + nextPoolId() + "-worker-");
checkPermission();
}
开execute()
:
public void execute(ForkJoinTask<?> task) {
if (task == null)
throw new NullPointerException();
externalPush(task);
}
externalPush
调用externalSubmit
,您可以WorkQueue
在该实现中查看详细信息。
外部提交:
// 外部操作
/**
* Full version of externalPush, handling uncommon cases, as well
* as performing secondary initialization upon the first
* submission of the first task to the pool. It also detects
* first submission by an external thread and creates a new shared
* queue if the one at index if empty or contended.
*
* @param task the task. Caller must ensure non-null.
*/
WorkQueue
您可以在课堂上找到有关队列大小的更多详细信息
static final class WorkQueue {
文档WokrQueue
:
/**
* Queues supporting work-stealing as well as external task
* submission. See above for descriptions and algorithms.
* Performance on most platforms is very sensitive to placement of
* instances of both WorkQueues and their arrays -- we absolutely
* do not want multiple WorkQueue instances or multiple queue
* arrays sharing cache lines. The @Contended annotation alerts
* JVMs to try to keep instances apart.
*/
@sun.misc.Contended
/**
* Capacity of work-stealing queue array upon initialization.
* Must be a power of two; at least 4, but should be larger to
* reduce or eliminate cacheline sharing among queues.
* Currently, it is much larger, as a partial workaround for
* the fact that JVMs often place arrays in locations that
* share GC bookkeeping (especially cardmarks) such that
* per-write accesses encounter serious memory contention.
*/
static final int INITIAL_QUEUE_CAPACITY = 1 << 13;
/**
* Maximum size for queue arrays. Must be a power of two less
* than or equal to 1 << (31 - width of array entry) to ensure
* lack of wraparound of index calculations, but defined to a
* value a bit less than this to help users trap runaway
* programs before saturating systems.
*/
static final int MAXIMUM_QUEUE_CAPACITY = 1 << 26; // 64M