18

我需要在Java(java.util.concurrent)中实现一个线程池,其线程数在空闲时处于某个最小值,当作业提交到其中的速度比完成执行快时增长到上限(但永远不会进一步) , 并在所有作业完成且不再提交作业时缩回到下限。

你将如何实现这样的事情?我想这将是一个相当常见的使用场景,但显然java.util.concurrent.Executors工厂方法只能创建固定大小的池和在提交许多作业时无限增长的池。该类ThreadPoolExecutor提供corePoolSizemaximumPoolSize参数,但其文档似乎暗示同时拥有多个线程的唯一方法corePoolSize是使用有界作业队列,在这种情况下,如果您已经到达maximumPoolSize线程,您将获得工作您必须自己处理的拒绝?我想出了这个:

//pool creation
ExecutorService pool = new ThreadPoolExecutor(minSize, maxSize, 500, TimeUnit.MILLISECONDS,
    new ArrayBlockingQueue<Runnable>(minSize));
...

//submitting jobs
for (Runnable job : ...) {
    while (true) {
        try {
            pool.submit(job);
            System.out.println("Job " + job + ": submitted");
            break;
        } catch (RejectedExecutionException e) {
            // maxSize jobs executing concurrently atm.; re-submit new job after short wait
            System.out.println("Job " + job + ": rejected...");
            try {
                Thread.sleep(300);
            } catch (InterruptedException e1) {
            }
        }
    }
}

我忽略了什么吗?有一个更好的方法吗?(total number of jobs) - maxSize此外,根据一个人的要求,上述代码在至少(我认为)工作完成之前不会完成可能会有问题。因此,如果您希望能够将任意数量的作业提交到池中并立即继续,而无需等待其中任何一个完成,我看不出如果没有专门的“作业 sumitting”线程来管理,您将如何做到这一点保存所有提交的作业所需的无界队列。AFAICS,如果您为 ThreadPoolExecutor 本身使用无界队列,它的线程数将永远不会超过 corePoolSize。

4

3 回答 3

13

当增长和收缩与线程一起出现时,我脑海中只有一个名字:来自 java.util.concurrent包的CachedThreadPool 。

ExecutorService executor = Executors.newCachedThreadPool();

CachedThreadPool() 可以重用线程,以及在需要时创建新线程。是的,如果一个线程空闲 60 秒,CachedThreadPool 会杀死它。所以这是相当轻量级的——用你的话来说是增长和缩小!

于 2012-06-28T17:40:38.537 回答
6

可能对您有所帮助的一个技巧是分配一个RejectedExecutionHandler使用相同线程将作业提交到阻塞队列中的方法。这将阻塞当前线程并消除对某种循环的需要。

在这里查看我的答案:

如果需要处理的数据太多,如何让 ThreadPoolExecutor 命令等待?

这是从该答案复制的拒绝处理程序。

final BlockingQueue queue = new ArrayBlockingQueue<Runnable>(200);
ThreadPoolExecutor threadPool = new ThreadPoolExecutor(nThreads, nThreads,
       0L, TimeUnit.MILLISECONDS, queue);
// by default (unfortunately) the ThreadPoolExecutor will call the rejected
// handler when you submit the 201st job, to have it block you do:
threadPool.setRejectedExecutionHandler(new RejectedExecutionHandler() {
   public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
      // this will block if the queue is full
      executor.getQueue().put(r);
   }
});

只要您意识到在核心线程之上创建任何线程之前首先使用的有界阻塞队列已填满,您就应该能够使用核心/最大线程数。因此,如果您有 10 个核心线程并且希望第 11 个作业启动第 11 个线程,那么不幸的是,您需要一个大小为 0 的阻塞队列(可能是 a SynchronousQueue)。我觉得这在其他很棒的ExecutorService课程中是一个真正的限制。

于 2012-06-28T16:58:31.650 回答
1

设置maximumPoolSizeInteger.MAX_VALUE。如果你曾经拥有超过 20 亿个线程……那么,祝你好运。

无论如何,ThreadPoolExecutor状态的Javadoc:

通过将 maximumPoolSize 设置为基本上无界的值,例如 Integer.MAX_VALUE,您允许池容纳任意数量的并发任务。最典型的是,核心和最大池大小仅在构造时设置,但它们也可以使用 setCorePoolSize(int) 和 setMaximumPoolSize(int) 动态更改。

使用类似的无界任务队列 a LinkedBlockingQueue,这应该具有任意大的容量。

于 2012-06-28T16:58:41.967 回答