2

我正在寻找一种ExecutorService按需创建线程达到预定义限制并在保持活动时间后销毁空闲线程的方法。

以下构造函数创建一个ThreadPoolExecutor具有固定线程数的:

// taken from Executors.newFixedThreadPool()
new ThreadPoolExecutor(nThreads, nThreads,
                                  0L, TimeUnit.MILLISECONDS,
                                  new LinkedBlockingQueue<Runnable>());

所以我试图创建一个ExecutorService这样的:

// taken from Executors.newCachedThreadPool()
new ThreadPoolExecutor(0, nThreads,
    CACHED_POOL_SHUTDOWN_DELAY, TimeUnit.SECONDS,
    new SynchronousQueue<Runnable>());

但它没有按预期工作,nThreads在使用时,Executor不会将新任务排入队列,而是抛出一个RejectedExecutionException. 我知道我可以为此实现一个处理程序,但这对我没有帮助。

如何创建Executor前面描述的内容?

4

2 回答 2

2

如果新任务无法排队,则会创建一个新线程,除非您已经达到最大核心池大小。在您的情况下,队列一次只能包含一个任务,因此如果您足够快地提交任务,您就会达到最大池大小并获得异常。

它适用于 CachedThreadPool,因为最大核心池大小很大 ( Integer.MAX_VALUE)。

您需要使用不同的队列,例如LinkedBlockingQueue固定线程池示例中的新队列。

旁注:检查实现文档可以帮助理解细节。特别是,类的execute方法ThreadPoolExecutor有:

   /*
     * Proceed in 3 steps:
     *
     * 1. If fewer than corePoolSize threads are running, try to
     * start a new thread with the given command as its first
     * task.  The call to addWorker atomically checks runState and
     * workerCount, and so prevents false alarms that would add
     * threads when it shouldn't, by returning false.
     *
     * 2. If a task can be successfully queued, then we still need
     * to double-check whether we should have added a thread
     * (because existing ones died since last checking) or that
     * the pool shut down since entry into this method. So we
     * recheck state and if necessary roll back the enqueuing if
     * stopped, or start a new thread if there are none.
     *
     * 3. If we cannot queue task, then we try to add a new
     * thread.  If it fails, we know we are shut down or saturated
     * and so reject the task.
     */
于 2013-07-30T05:46:14.600 回答
2

我在那篇文章中找到了一种方法,它完全符合我的需要。
@assylias 我认出了您的答案并更改了队列实现。

现在我的代码如下所示:

parallelExecutor = new ThreadPoolExecutor(nThreads, nThreads,
    CACHED_POOL_SHUTDOWN_DELAY, TimeUnit.SECONDS,
    new LinkedBlockingQueue<Runnable>());

parallelExecutor.allowCoreThreadTimeOut(true);   // this is the magic

它像一个固定的线程池一样工作,但允许这些内核超时。

于 2013-07-30T05:56:06.167 回答