137

似乎不可能创建一个对可以创建的线程数有限制的缓存线程池。

下面是静态Executors.newCachedThreadPool在标准 Java 库中的实现方式:

 public static ExecutorService newCachedThreadPool() {
    return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
                                  60L, TimeUnit.SECONDS,
                                  new SynchronousQueue<Runnable>());
}

因此,使用该模板继续创建一个固定大小的缓存线程池:

new ThreadPoolExecutor(0, 3, 60L, TimeUnit.SECONDS, new SynchronusQueue<Runable>());

现在,如果您使用它并提交 3 个任务,一切都会好起来的。提交任何进一步的任务将导致拒绝执行异常。

试试这个:

new ThreadPoolExecutor(0, 3, 60L, TimeUnit.SECONDS, new LinkedBlockingQueue<Runable>());

将导致所有线程顺序执行。即,线程池永远不会产生多个线程来处理您的任务。

这是 ? 的执行方法中的错误ThreadPoolExecutor?或者,也许这是故意的?还是有其他方法?

编辑:我想要与缓存线程池完全一样的东西(它按需创建线程,然后在超时后将它们杀死),但它可以创建的线程数有限制,并且一旦它有继续排队其他任务的能力达到其线程限制。根据 sjlee 的回应,这是不可能的。看它的execute()方法ThreadPoolExecutor确实是不可能的。我需要像确实那样子类化ThreadPoolExecutor和覆盖,但它所做的是一个完整的hack。execute()SwingWorkerSwingWorkerexecute()

4

14 回答 14

243

有以下ThreadPoolExecutor几个关键行为,你的问题可以通过这些行为来解释。

提交任务时,

  1. 如果线程池还没有达到核心大小,它会创建新线程。
  2. 如果已达到核心大小并且没有空闲线程,它会将任务排队。
  3. 如果已达到核心大小,没有空闲线程,并且队列已满,它会创建新线程(直到达到最大大小)。
  4. 如果已达到最大大小,没有空闲线程,并且队列已满,则拒绝策略启动。

在第一个示例中,请注意 的SynchronousQueue大小本质上为 0。因此,当您达到最大大小 (3) 时,拒绝策略就会启动 (#4)。

在第二个示例中,选择的队列是一个LinkedBlockingQueue大小不受限制的队列。因此,您会陷入行为 #2。

您不能真正修改缓存类型或固定类型,因为它们的行为几乎完全确定。

如果你想拥有一个有界和动态的线程池,你需要使用一个正的核心大小和最大大小以及一个有限大小的队列。例如,

new ThreadPoolExecutor(10, // core size
    50, // max size
    10*60, // idle timeout
    TimeUnit.SECONDS,
    new ArrayBlockingQueue<Runnable>(20)); // queue with a size

附录:这是一个相当古老的答案,似乎 JDK 在核心大小为 0 时改变了它的行为。从 JDK 1.6 开始,如果核心大小为 0 并且池没有任何线程,则 ThreadPoolExecutor 将添加一个线程来执行该任务。因此,核心大小为 0 是上述规则的一个例外。感谢史蒂夫让我注意到这一点

于 2009-11-25T23:11:00.500 回答
64

除非我错过了什么,否则原始问题的解决方案很简单。以下代码实现了原始海报所描述的所需行为。它将产生多达 5 个线程来处理无界队列,空闲线程将在 60 秒后终止。

tp = new ThreadPoolExecutor(5, 5, 60, TimeUnit.SECONDS,
                    new LinkedBlockingQueue<Runnable>());
tp.allowCoreThreadTimeOut(true);
于 2011-11-14T17:24:56.290 回答
8

有同样的问题。由于没有其他答案将所有问题放在一起,我添加了我的:

现在在文档中清楚地写了:如果您使用不阻塞的队列(LinkedBlockingQueue)最大线程设置无效,则仅使用核心线程。

所以:

public class MyExecutor extends ThreadPoolExecutor {

    public MyExecutor() {
        super(4, 4, 5,TimeUnit.SECONDS, new LinkedBlockingQueue<Runnable>());
        allowCoreThreadTimeOut(true);
    }

    public void setThreads(int n){
        setMaximumPoolSize(Math.max(1, n));
        setCorePoolSize(Math.max(1, n));
    }

}

该执行人具有:

  1. 没有最大线程的概念,因为我们使用的是无界队列。这是一件好事,因为这样的队列可能会导致执行程序创建大量非核心的额外线程,如果它遵循其通常的策略。

  2. 最大大小的队列Integer.MAX_VALUESubmit()如果挂起RejectedExecutionException的任务数量超过Integer.MAX_VALUE. 不确定我们会先耗尽内存,否则会发生这种情况。

  3. 可能有 4 个核心线程。如果空闲 5 秒,空闲核心线程会自动退出。所以,是的,严格按需线程。数量可以使用setThreads()方法改变。

  4. 确保最小核心线程数不小于一个,否则submit()将拒绝每个任务。由于核心线程需要>= max threads,因此该方法setThreads()也设置了 max threads,尽管 max thread 设置对于无界队列是无用的。

于 2013-08-10T14:35:12.207 回答
7

在您的第一个示例中,后续任务将被拒绝,因为AbortPolicy是默认RejectedExecutionHandler。ThreadPoolExecutor 包含以下策略,您可以通过setRejectedExecutionHandler方法更改它们:

CallerRunsPolicy
AbortPolicy
DiscardPolicy
DiscardOldestPolicy

听起来您想要带有 CallerRunsPolicy 的缓存线程池。

于 2009-11-25T22:36:54.750 回答
5

这里的答案都没有解决我的问题,这与使用 Apache 的 HTTP 客户端(3.x 版本)创建有限数量的 HTTP 连接有关。由于我花了几个小时才找到一个好的设置,我将分享:

private ExecutorService executor = new ThreadPoolExecutor(5, 10, 60L,
  TimeUnit.SECONDS, new SynchronousQueue<Runnable>(),
  Executors.defaultThreadFactory(), new ThreadPoolExecutor.CallerRunsPolicy());

这将创建一个ThreadPoolExecutor以 5 开头并最多容纳 10 个同时运行的线程,CallerRunsPolicy用于执行。

于 2010-12-05T01:01:38.913 回答
3

根据 ThreadPoolExecutor 的 Javadoc:

如果运行的线程数多于 corePoolSize 但少于 maximumPoolSize,则仅当队列已满时才会创建新线程。通过将 corePoolSize 和 maximumPoolSize 设置为相同,您可以创建一个固定大小的线程池。

(强调我的。)

jitter 的答案就是你想要的,尽管我的答案是你的另一个问题。:)

于 2009-11-25T22:30:04.973 回答
2

这就是你想要的(至少我猜是这样)。如需解释,请查看Jonathan Feinberg 的回答

Executors.newFixedThreadPool(int n)

创建一个线程池,该线程池重用在共享无界队列上运行的固定数量的线程。在任何时候,最多 nThreads 个线程将是活动的处理任务。如果在所有线程都处于活动状态时提交了其他任务,它们将在队列中等待,直到有线程可用。如果任何线程在关闭之前的执行过程中由于失败而终止,如果需要执行后续任务,新的线程将取代它。池中的线程将一直存在,直到显式关闭。

于 2009-11-25T22:23:20.827 回答
2

还有一种选择。除了使用新的 SynchronousQueue 之外,您还可以使用任何其他队列,但您必须确保其大小为 1,这样会强制 executorservice 创建新线程。

于 2011-08-29T10:10:46.020 回答
2

看起来好像没有任何答案实际上回答了这个问题 - 事实上我看不到这样做的方法 - 即使你从 PooledExecutorService 子类化,因为许多方法/属性是私有的,例如使 addIfUnderMaximumPoolSize 受到保护你可以请执行下列操作:

class MyThreadPoolService extends ThreadPoolService {
    public void execute(Runnable run) {
        if (poolSize() == 0) {
            if (addIfUnderMaximumPoolSize(run) != null)
                return;
        }
        super.execute(run);
    }
}

我得到的最接近的是这个 - 但即使这也不是一个很好的解决方案

new ThreadPoolExecutor(min, max, 60L, TimeUnit.SECONDS, new LinkedBlockingQueue<Runnable>()) {
    public void execute(Runnable command) {
        if (getPoolSize() == 0 && getActiveCount() < getMaximumPoolSize()) {        
            super.setCorePoolSize(super.getCorePoolSize() + 1);
        }
        super.execute(command);
    }

    protected void afterExecute(Runnable r, Throwable t) {
         // nothing in the queue
         if (getQueue().isEmpty() && getPoolSize() > min) {
             setCorePoolSize(getCorePoolSize() - 1);
         }
    };
 };

ps 上面没有测试

于 2011-10-14T23:39:18.673 回答
2

这是另一个解决方案。我认为此解决方案的行为如您所愿(尽管对此解决方案并不感到自豪):

final LinkedBlockingQueue<Runnable> queue = new LinkedBlockingQueue<Runnable>() {
    public boolean offer(Runnable o) {
        if (size() > 1)
            return false;
        return super.offer(o);
    };

    public boolean add(Runnable o) {
        if (super.offer(o))
            return true;
        else
            throw new IllegalStateException("Queue full");
    }
};

RejectedExecutionHandler handler = new RejectedExecutionHandler() {         
    public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
        queue.add(r);
    }
};

dbThreadExecutor =
        new ThreadPoolExecutor(min, max, 60L, TimeUnit.SECONDS, queue, handler);
于 2011-10-15T01:18:23.720 回答
0
  1. 您可以按照@sjleeThreadPoolExecutor的建议使用

    您可以动态控制池的大小。看看这个问题了解更多细节:

    动态线程池

    或者

  2. 您可以使用java 8 引入的newWorkStealingPool API。

    public static ExecutorService newWorkStealingPool()
    

    创建一个工作窃取线程池,使用所有可用处理器作为其目标并行度级别。

默认情况下,并行级别设置为服务器中的 CPU 内核数。如果您有 4 核 CPU 服务器,线程池大小将为 4。此 API 返回ForkJoinPool类型ExecutorService 并允许通过从 ForkJoinPool 中的繁忙线程中窃取任务来窃取空闲线程的工作。

于 2016-03-02T02:59:59.900 回答
0

问题总结如下:

我想要与缓存线程池完全一样的东西(它按需创建线程,然后在超时后将它们杀死),但它可以创建的线程数有限制,并且一旦它达到它就可以继续排队其他任务线程限制。

在指出解决方案之前,我将解释为什么以下解决方案不起作用:

new ThreadPoolExecutor(0, 3, 60L, TimeUnit.SECONDS, new SynchronousQueue<>());

当达到 3 的限制时,这不会对任何任务进行排队,因为根据定义, SynchronousQueue 不能容纳任何元素。

new ThreadPoolExecutor(0, 3, 60L, TimeUnit.SECONDS, new LinkedBlockingQueue<>());

这不会创建多个线程,因为 ThreadPoolExecutor 仅在队列已满时创建超过 corePoolSize 的线程。但是 LinkedBlockingQueue 永远不会满。

ThreadPoolExecutor executor = new ThreadPoolExecutor(3, 3, 60, TimeUnit.SECONDS,
    new LinkedBlockingQueue<Runnable>());
executor.allowCoreThreadTimeOut(true);

在达到 corePoolSize 之前,这不会重用线程,因为 ThreadPoolExecutor 会增加线程数,直到达到 corePoolSize,即使现有线程处于空闲状态。如果您可以忍受这个缺点,那么这是解决问题的最简单方法。这也是“Java Concurrency in Practice”(p172 上的脚注)中描述的解决方案。

所描述问题的唯一完整解决方案似乎是涉及覆盖队列offer方法并编写 a的解决方案,RejectedExecutionHandler如该问题的答案中所述:How to get the ThreadPoolExecutor to increase threads to max before queueing?

于 2019-08-06T08:46:05.403 回答
0

这适用于 Java8+(和其他,现在..)

     Executor executor = new ThreadPoolExecutor(3, 3, 5, TimeUnit.SECONDS,
                new LinkedBlockingQueue<>()){{allowCoreThreadTimeOut(true);}};

其中 3 是线程数的限制,5 是空闲线程的超时。

如果您想检查它是否自己工作,这里是完成这项工作的代码:

public static void main(String[] args) throws InterruptedException {
    final int DESIRED_NUMBER_OF_THREADS=3; // limit of number of Threads for the task at a time
    final int DESIRED_THREAD_IDLE_DEATH_TIMEOUT=5; //any idle Thread ends if it remains idle for X seconds

    System.out.println( java.lang.Thread.activeCount() + " threads");
    Executor executor = new ThreadPoolExecutor(DESIRED_NUMBER_OF_THREADS, DESIRED_NUMBER_OF_THREADS, DESIRED_THREAD_IDLE_DEATH_TIMEOUT, TimeUnit.SECONDS,
            new LinkedBlockingQueue<>()) {{allowCoreThreadTimeOut(true);}};

    System.out.println(java.lang.Thread.activeCount() + " threads");

    for (int i = 0; i < 5; i++) {
        final int fi = i;
        executor.execute(() -> waitsout("starting hard thread computation " + fi, "hard thread computation done " + fi,2000));
    }
    System.out.println("If this is UP, it works");

    while (true) {
        System.out.println(
                java.lang.Thread.activeCount() + " threads");
        Thread.sleep(700);
    }

}

static void waitsout(String pre, String post, int timeout) {
    try {
        System.out.println(pre);
        Thread.sleep(timeout);
        System.out.println(post);
    } catch (Exception e) {
    }
}

上面代码的输出对我来说是

1 threads
1 threads
If this is UP, it works
starting hard thread computation 0
4 threads
starting hard thread computation 2
starting hard thread computation 1
4 threads
4 threads
hard thread computation done 2
hard thread computation done 0
hard thread computation done 1
starting hard thread computation 3
starting hard thread computation 4
4 threads
4 threads
4 threads
hard thread computation done 3
hard thread computation done 4
4 threads
4 threads
4 threads
4 threads
3 threads
3 threads
3 threads
1 threads
1 threads
于 2019-10-23T13:18:42.637 回答
0

我推荐使用信号方法

来自SignalExecutors类:

如果提供的队列从 offer() 返回 false,ThreadPoolExecutor 只会创建一个新线程。这意味着如果你给它一个无界队列,它只会创建 1 个线程,不管队列有多长。但是,如果您绑定队列并提交比线程更多的可运行文件,您的任务将被拒绝并引发异常。因此,我们创建了一个队列,如果它非空,则该队列将始终返回 false,以确保创建新线程。然后,如果任务被拒绝,我们只需将其添加到队列中。

    public static ExecutorService newCachedBoundedExecutor(final String name, int minThreads, int maxThreads) {
    ThreadPoolExecutor threadPool = new ThreadPoolExecutor(minThreads,
            maxThreads,
            30,
            TimeUnit.SECONDS,
            new LinkedBlockingQueue<Runnable>() {
                @Override
                public boolean offer(Runnable runnable) {
                    if (size() > 1 && size() <= maxThreads) {
                        //create new thread
                        return false;
                    } else {
                        return super.offer(runnable);
                    }
                }
            }, new NumberedThreadFactory(name));

    threadPool.setRejectedExecutionHandler((runnable, executor) -> {
        try {
            executor.getQueue().put(runnable);
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
        }
    });

    return threadPool;
}
于 2021-02-02T15:36:16.000 回答