ThreadPoolExecutor
在启动更多线程之前,队列需要有界和满的位置,我该如何解决这个限制。
我相信我终于找到了一个有点优雅(也许有点老套)的解决方案来解决这个限制ThreadPoolExecutor
。它涉及扩展LinkedBlockingQueue
以使其在已经有一些任务排队时返回false
。queue.offer(...)
如果当前线程跟不上排队的任务,TPE 将添加额外的线程。如果池已经处于最大线程,则将RejectedExecutionHandler
调用put(...)
进入队列的线程。
offer(...)
编写一个可以返回false
并且从不阻塞的队列当然很奇怪,put()
所以这就是黑客部分。但这适用于 TPE 对队列的使用,所以我认为这样做没有任何问题。
这是代码:
// extend LinkedBlockingQueue to force offer() to return false conditionally
BlockingQueue<Runnable> queue = new LinkedBlockingQueue<Runnable>() {
private static final long serialVersionUID = -6903933921423432194L;
@Override
public boolean offer(Runnable e) {
// Offer it to the queue if there is 0 items already queued, else
// return false so the TPE will add another thread. If we return false
// and max threads have been reached then the RejectedExecutionHandler
// will be called which will do the put into the queue.
if (size() == 0) {
return super.offer(e);
} else {
return false;
}
}
};
ThreadPoolExecutor threadPool = new ThreadPoolExecutor(1 /*core*/, 50 /*max*/,
60 /*secs*/, TimeUnit.SECONDS, queue);
threadPool.setRejectedExecutionHandler(new RejectedExecutionHandler() {
@Override
public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
try {
// This does the actual put into the queue. Once the max threads
// have been reached, the tasks will then queue up.
executor.getQueue().put(r);
// we do this after the put() to stop race conditions
if (executor.isShutdown()) {
throw new RejectedExecutionException(
"Task " + r + " rejected from " + e);
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
return;
}
}
});
有了这个机制,当我向队列提交任务时,ThreadPoolExecutor
会:
- 最初将线程数扩展到核心大小(此处为 1)。
- 将其提供给队列。如果队列为空,它将排队等待由现有线程处理。
- 如果队列已经有 1 个或多个元素,
offer(...)
则将返回 false。
- 如果返回 false,则增加池中的线程数,直到它们达到最大数量(此处为 50)。
- 如果在最大值,那么它调用
RejectedExecutionHandler
- 然后
RejectedExecutionHandler
将任务放入队列中,由第一个可用线程按 FIFO 顺序处理。
尽管在我上面的示例代码中,队列是无界的,但您也可以将其定义为有界队列。例如,如果您向 1000 添加容量,LinkedBlockingQueue
那么它将:
- 将线程扩展到最大
- 然后排队,直到它满了 1000 个任务
- 然后阻塞调用者,直到队列有可用空间。
此外,如果您需要offer(...)
在
RejectedExecutionHandler
then 中使用,您可以使用该offer(E, long, TimeUnit)
方法而不是 withLong.MAX_VALUE
作为超时。
警告:
如果您希望在执行程序关闭后将任务添加到执行程序,那么您可能希望更聪明地在执行程序服务关闭时抛出RejectedExecutionException
我们的自定义。RejectedExecutionHandler
感谢@RaduToader 指出这一点。
编辑:
对这个答案的另一个调整可能是询问 TPE 是否有空闲线程,并且只有在有空闲线程时才将项目入队。您必须为此创建一个真正的类并ourQueue.setThreadPoolExecutor(tpe);
在其上添加方法。
那么您的offer(...)
方法可能类似于:
- 检查是否
tpe.getPoolSize() == tpe.getMaximumPoolSize()
在这种情况下只需调用super.offer(...)
.
- Else if
tpe.getPoolSize() > tpe.getActiveCount()
then 调用super.offer(...)
,因为似乎有空闲线程。
- 否则返回
false
fork 另一个线程。
也许是这样:
int poolSize = tpe.getPoolSize();
int maximumPoolSize = tpe.getMaximumPoolSize();
if (poolSize >= maximumPoolSize || poolSize > tpe.getActiveCount()) {
return super.offer(e);
} else {
return false;
}
请注意,TPE 上的 get 方法很昂贵,因为它们访问volatile
字段或(在 的情况下getActiveCount()
)锁定 TPE 并遍历线程列表。此外,这里的竞争条件可能会导致任务不正确地入队或在有空闲线程时分叉另一个线程。