我想创建一个ThreadPoolExecutor
这样,当它达到最大大小并且队列已满时,该submit()
方法会在尝试添加新任务时阻塞。我是否需要为此实现自定义RejectedExecutionHandler
,或者是否有使用标准 Java 库执行此操作的现有方法?
17 回答
我刚刚发现的一种可能的解决方案:
public class BoundedExecutor {
private final Executor exec;
private final Semaphore semaphore;
public BoundedExecutor(Executor exec, int bound) {
this.exec = exec;
this.semaphore = new Semaphore(bound);
}
public void submitTask(final Runnable command)
throws InterruptedException, RejectedExecutionException {
semaphore.acquire();
try {
exec.execute(new Runnable() {
public void run() {
try {
command.run();
} finally {
semaphore.release();
}
}
});
} catch (RejectedExecutionException e) {
semaphore.release();
throw e;
}
}
}
还有其他解决方案吗?我更喜欢基于的东西,RejectedExecutionHandler
因为它似乎是处理这种情况的标准方法。
您可以使用 ThreadPoolExecutor 和阻塞队列:
public class ImageManager {
BlockingQueue<Runnable> blockingQueue = new ArrayBlockingQueue<Runnable>(blockQueueSize);
RejectedExecutionHandler rejectedExecutionHandler = new ThreadPoolExecutor.CallerRunsPolicy();
private ExecutorService executorService = new ThreadPoolExecutor(numOfThread, numOfThread,
0L, TimeUnit.MILLISECONDS, blockingQueue, rejectedExecutionHandler);
private int downloadThumbnail(String fileListPath){
executorService.submit(new yourRunnable());
}
}
您应该使用CallerRunsPolicy
,它在调用线程中执行被拒绝的任务。这样,在该任务完成之前,它无法向执行器提交任何新任务,此时将有一些空闲的池线程或该过程将重复。
从文档:
被拒绝的任务
当 Executor 关闭时,以及 Executor 对最大线程和工作队列容量都使用有限的界限并且饱和时,在方法 execute(java.lang.Runnable) 中提交的新任务将被拒绝。在任何一种情况下,execute 方法都会调用其 RejectedExecutionHandler 的 RejectedExecutionHandler.rejectedExecution(java.lang.Runnable, java.util.concurrent.ThreadPoolExecutor) 方法。提供了四个预定义的处理程序策略:
- 在默认的 ThreadPoolExecutor.AbortPolicy 中,处理程序在拒绝时抛出运行时 RejectedExecutionException。
- 在 ThreadPoolExecutor.CallerRunsPolicy 中,调用 execute 本身的线程运行任务。这提供了一种简单的反馈控制机制,可以减慢提交新任务的速度。
- 在 ThreadPoolExecutor.DiscardPolicy 中,简单地丢弃了无法执行的任务。
- 在 ThreadPoolExecutor.DiscardOldestPolicy 中,如果 executor 没有关闭,则丢弃工作队列头部的任务,然后重试执行(可能再次失败,导致重复此操作。)
此外,请确保在调用ThreadPoolExecutor
构造函数时使用有界队列,例如 ArrayBlockingQueue。否则,什么都不会被拒绝。
编辑:响应您的评论,将 ArrayBlockingQueue 的大小设置为等于线程池的最大大小并使用 AbortPolicy。
编辑2:好的,我明白你在说什么。怎么样:重写beforeExecute()
方法以检查getActiveCount()
不超过getMaximumPoolSize()
,如果超过,睡眠并重试?
我知道,这是一个 hack,但在我看来,这里提供的最干净的 hack ;-)
因为 ThreadPoolExecutor 使用阻塞队列“offer”而不是“put”,让我们覆盖阻塞队列的“offer”行为:
class BlockingQueueHack<T> extends ArrayBlockingQueue<T> {
BlockingQueueHack(int size) {
super(size);
}
public boolean offer(T task) {
try {
this.put(task);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
return true;
}
}
ThreadPoolExecutor tp = new ThreadPoolExecutor(1, 2, 1, TimeUnit.MINUTES, new BlockingQueueHack(5));
我测试了它,它似乎工作。实施一些超时策略留作读者练习。
Hibernate 有一个BlockPolicy
简单的,可以做你想做的事:
请参阅:Executors.java
/**
* A handler for rejected tasks that will have the caller block until
* space is available.
*/
public static class BlockPolicy implements RejectedExecutionHandler {
/**
* Creates a <tt>BlockPolicy</tt>.
*/
public BlockPolicy() { }
/**
* Puts the Runnable to the blocking queue, effectively blocking
* the delegating thread until space is available.
* @param r the runnable task requested to be executed
* @param e the executor attempting to execute this task
*/
public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
try {
e.getQueue().put( r );
}
catch (InterruptedException e1) {
log.error( "Work discarded, thread was interrupted while waiting for space to schedule: {}", r );
}
}
}
上面从Java Concurrency in Practice 中引用的BoundedExecutor
答案只有在您为 Executor 使用无界队列或信号量限制不大于队列大小时才能正常工作。信号量在提交线程和池中的线程之间是状态共享的,即使队列大小 < 绑定 <= (队列大小 + 池大小),也可以使执行程序饱和。
使用CallerRunsPolicy
仅在您的任务不会永远运行时才有效,在这种情况下,您的提交线程将rejectedExecution
永远保留,如果您的任务需要很长时间才能运行,这是一个坏主意,因为提交线程无法提交任何新任务或如果它本身正在运行任务,请执行其他任何操作。
如果这不可接受,那么我建议在提交任务之前检查执行程序的有界队列的大小。如果队列已满,请稍等片刻,然后再次尝试提交。吞吐量会受到影响,但我建议它比许多其他建议的解决方案更简单,并且您可以保证不会拒绝任何任务。
下面的类包裹了一个 ThreadPoolExecutor 并使用一个 Semaphore 来阻塞,然后工作队列已满:
public final class BlockingExecutor {
private final Executor executor;
private final Semaphore semaphore;
public BlockingExecutor(int queueSize, int corePoolSize, int maxPoolSize, int keepAliveTime, TimeUnit unit, ThreadFactory factory) {
BlockingQueue<Runnable> queue = new LinkedBlockingQueue<Runnable>();
this.executor = new ThreadPoolExecutor(corePoolSize, maxPoolSize, keepAliveTime, unit, queue, factory);
this.semaphore = new Semaphore(queueSize + maxPoolSize);
}
private void execImpl (final Runnable command) throws InterruptedException {
semaphore.acquire();
try {
executor.execute(new Runnable() {
@Override
public void run() {
try {
command.run();
} finally {
semaphore.release();
}
}
});
} catch (RejectedExecutionException e) {
// will never be thrown with an unbounded buffer (LinkedBlockingQueue)
semaphore.release();
throw e;
}
}
public void execute (Runnable command) throws InterruptedException {
execImpl(command);
}
}
这个包装类基于 Brian Goetz 的 Java Concurrency in Practice 一书中给出的解决方案。书中的解决方案只需要两个构造函数参数:一个Executor
和一个用于信号量的边界。这显示在 Fixpoint 给出的答案中。这种方法存在一个问题:它可能会处于池线程繁忙、队列已满,但信号量刚刚释放许可的状态。(semaphore.release()
在 finally 块中)。在这种状态下,一个新任务可以抢到刚刚释放的permit,但是因为任务队列已满而被拒绝。当然,这不是你想要的;在这种情况下你想阻止。
为了解决这个问题,我们必须使用无界队列,正如 JCiP 明确提到的那样。信号量充当守卫,产生虚拟队列大小的效果。这具有副作用,即单元可能包含maxPoolSize + virtualQueueSize + maxPoolSize
任务。这是为什么?因为
semaphore.release()
在 finally 块中。如果所有池线程同时调用该语句,则maxPoolSize
释放许可,允许相同数量的任务进入单元。如果我们使用有界队列,它仍然是满的,导致任务被拒绝。现在,因为我们知道这只发生在池线程几乎完成时,所以这不是问题。我们知道池线程不会阻塞,所以很快就会从队列中取出一个任务。
不过,您可以使用有界队列。只要确保它的大小等于virtualQueueSize + maxPoolSize
. 更大的尺寸是没有用的,信号量会阻止让更多的项目进入。更小的尺寸会导致任务被拒绝。任务被拒绝的机会随着大小的减小而增加。例如,假设您想要一个 maxPoolSize=2 和 virtualQueueSize=5 的有界执行程序。然后使用 5+2=7 许可和 5+2=7 的实际队列大小的信号量。那么单元中可以执行的实际任务数为 2+5+2=9。当 executor 已满(队列中有 5 个任务,线程池中有 2 个,因此可用的许可为 0)并且所有池线程释放它们的许可时,进入的任务恰好可以获取 2 个许可。
现在,JCiP 的解决方案使用起来有些麻烦,因为它不强制执行所有这些约束(无界队列,或受这些数学限制等)。我认为这只是一个很好的例子来演示如何基于已经可用的部分构建新的线程安全类,而不是作为一个成熟的、可重用的类。我不认为后者是作者的意图。
您可以像这样使用自定义 RejectedExecutionHandler
ThreadPoolExecutor tp= new ThreadPoolExecutor(core_size, // core size
max_handlers, // max size
timeout_in_seconds, // idle timeout
TimeUnit.SECONDS, queue, new RejectedExecutionHandler() {
public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
// This will block if the queue is full
try {
executor.getQueue().put(r);
} catch (InterruptedException e) {
System.err.println(e.getMessage());
}
}
});
我并不总是喜欢 CallerRunsPolicy,尤其是因为它允许被拒绝的任务“跳过队列”并在之前提交的任务之前执行。此外,在调用线程上执行任务可能需要比等待第一个插槽可用更长的时间。
我使用自定义的 RejectedExecutionHandler 解决了这个问题,它只是将调用线程阻塞了一会儿,然后再次尝试提交任务:
public class BlockWhenQueueFull implements RejectedExecutionHandler {
public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
// The pool is full. Wait, then try again.
try {
long waitMs = 250;
Thread.sleep(waitMs);
} catch (InterruptedException interruptedException) {}
executor.execute(r);
}
}
此类只能在线程池执行器中用作 RejectedExecutinHandler ,例如:
executorPool = new ThreadPoolExecutor(1, 1, 10,
TimeUnit.SECONDS, new SynchronousQueue<Runnable>(),
new BlockWhenQueueFull());
我看到的唯一缺点是调用线程可能会被锁定比严格必要的时间稍长(最多 250 毫秒)。此外,由于此执行程序实际上是递归调用的,因此等待线程变得可用的时间很长(数小时)可能会导致堆栈溢出。
不过,我个人喜欢这种方法。它结构紧凑、易于理解且运行良好。
创建您自己的阻塞队列以供 Executor 使用,具有您正在寻找的阻塞行为,同时始终返回可用的剩余容量(确保 executor 不会尝试创建比其核心池更多的线程,或触发拒绝处理程序)。
我相信这会让你得到你正在寻找的阻塞行为。拒绝处理程序永远不会符合要求,因为这表明执行者无法执行任务。我可以设想的是,您在处理程序中会遇到某种形式的“忙于等待”。这不是你想要的,你想要一个阻塞调用者的执行者队列......
避免@FixPoint 解决方案出现问题。可以使用 ListeningExecutorService 并在 FutureCallback 中释放信号量 onSuccess 和 onFailure。
最近我发现这个问题有同样的问题。OP 没有明确说明,但我们不想使用RejectedExecutionHandler
在提交者线程上执行任务的 ,因为如果该任务是长时间运行的任务,这将未充分利用工作线程。
阅读所有答案和评论,特别是使用信号量或使用有缺陷的解决方案,afterExecute
我仔细查看了ThreadPoolExecutor的代码,看看是否有出路。惊讶地看到有2000多行(注释)代码,其中一些让我感到头晕目眩。鉴于我实际上有一个相当简单的要求——一个生产者,几个消费者,当没有消费者可以工作时让生产者阻塞——我决定推出自己的解决方案。它不是一个,ExecutorService
而只是一个Executor
。而且它不会根据工作负载调整线程数,而是只保留固定数量的线程,这也符合我的要求。这是代码。随意抱怨它:-)
package x;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.Executor;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.SynchronousQueue;
/**
* distributes {@code Runnable}s to a fixed number of threads. To keep the
* code lean, this is not an {@code ExecutorService}. In particular there is
* only very simple support to shut this executor down.
*/
public class ParallelExecutor implements Executor {
// other bounded queues work as well and are useful to buffer peak loads
private final BlockingQueue<Runnable> workQueue =
new SynchronousQueue<Runnable>();
private final Thread[] threads;
/*+**********************************************************************/
/**
* creates the requested number of threads and starts them to wait for
* incoming work
*/
public ParallelExecutor(int numThreads) {
this.threads = new Thread[numThreads];
for(int i=0; i<numThreads; i++) {
// could reuse the same Runner all over, but keep it simple
Thread t = new Thread(new Runner());
this.threads[i] = t;
t.start();
}
}
/*+**********************************************************************/
/**
* returns immediately without waiting for the task to be finished, but may
* block if all worker threads are busy.
*
* @throws RejectedExecutionException if we got interrupted while waiting
* for a free worker
*/
@Override
public void execute(Runnable task) {
try {
workQueue.put(task);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new RejectedExecutionException("interrupt while waiting for a free "
+ "worker.", e);
}
}
/*+**********************************************************************/
/**
* Interrupts all workers and joins them. Tasks susceptible to an interrupt
* will preempt their work. Blocks until the last thread surrendered.
*/
public void interruptAndJoinAll() throws InterruptedException {
for(Thread t : threads) {
t.interrupt();
}
for(Thread t : threads) {
t.join();
}
}
/*+**********************************************************************/
private final class Runner implements Runnable {
@Override
public void run() {
while (!Thread.currentThread().isInterrupted()) {
Runnable task;
try {
task = workQueue.take();
} catch (InterruptedException e) {
// canonical handling despite exiting right away
Thread.currentThread().interrupt();
return;
}
try {
task.run();
} catch (RuntimeException e) {
// production code to use a logging framework
e.printStackTrace();
}
}
}
}
}
我相信有一种非常优雅的方法可以通过使用java.util.concurrent.Semaphore
和委托行为来解决这个问题Executor.newFixedThreadPool
。新的执行器服务只会在有线程执行时才会执行新任务。阻塞由 Semaphore 管理,许可数等于线程数。任务完成后,它会返回一个许可。
public class FixedThreadBlockingExecutorService extends AbstractExecutorService {
private final ExecutorService executor;
private final Semaphore blockExecution;
public FixedThreadBlockingExecutorService(int nTreads) {
this.executor = Executors.newFixedThreadPool(nTreads);
blockExecution = new Semaphore(nTreads);
}
@Override
public void shutdown() {
executor.shutdown();
}
@Override
public List<Runnable> shutdownNow() {
return executor.shutdownNow();
}
@Override
public boolean isShutdown() {
return executor.isShutdown();
}
@Override
public boolean isTerminated() {
return executor.isTerminated();
}
@Override
public boolean awaitTermination(long timeout, TimeUnit unit) throws InterruptedException {
return executor.awaitTermination(timeout, unit);
}
@Override
public void execute(Runnable command) {
blockExecution.acquireUninterruptibly();
executor.execute(() -> {
try {
command.run();
} finally {
blockExecution.release();
}
});
}
过去我也有同样的需求:一种由共享线程池支持的每个客户端都有固定大小的阻塞队列。我最终编写了自己的 ThreadPoolExecutor:
UserThreadPoolExecutor(阻塞队列(每个客户端)+线程池(在所有客户端之间共享))
见:https ://github.com/d4rxh4wx/UserThreadPoolExecutor
每个 UserThreadPoolExecutor 都从共享的 ThreadPoolExecutor 中获得最大线程数
每个 UserThreadPoolExecutor 可以:
- 如果未达到配额,则将任务提交给共享线程池执行器。如果达到其配额,则将作业排队(非消耗性阻塞等待 CPU)。一旦其提交的任务之一完成,配额就会减少,允许另一个等待提交到 ThreadPoolExecutor 的任务
- 等待剩余任务完成
我在弹性搜索客户端中找到了这个拒绝策略。它在阻塞队列上阻塞调用者线程。下面的代码 -
static class ForceQueuePolicy implements XRejectedExecutionHandler
{
public void rejectedExecution(Runnable r, ThreadPoolExecutor executor)
{
try
{
executor.getQueue().put(r);
}
catch (InterruptedException e)
{
//should never happen since we never wait
throw new EsRejectedExecutionException(e);
}
}
@Override
public long rejected()
{
return 0;
}
}
我最近需要实现类似的东西,但是在ScheduledExecutorService
.
我还必须确保我处理了方法上传递的延迟,并确保任务提交以在调用者期望的时间执行,或者只是失败从而抛出一个RejectedExecutionException
.
ScheduledThreadPoolExecutor
执行或提交任务的其他方法在内部调用#schedule
,这仍将依次调用被覆盖的方法。
import java.util.concurrent.*;
public class BlockingScheduler extends ScheduledThreadPoolExecutor {
private final Semaphore maxQueueSize;
public BlockingScheduler(int corePoolSize,
ThreadFactory threadFactory,
int maxQueueSize) {
super(corePoolSize, threadFactory, new AbortPolicy());
this.maxQueueSize = new Semaphore(maxQueueSize);
}
@Override
public ScheduledFuture<?> schedule(Runnable command,
long delay,
TimeUnit unit) {
final long newDelayInMs = beforeSchedule(command, unit.toMillis(delay));
return super.schedule(command, newDelayInMs, TimeUnit.MILLISECONDS);
}
@Override
public <V> ScheduledFuture<V> schedule(Callable<V> callable,
long delay,
TimeUnit unit) {
final long newDelayInMs = beforeSchedule(callable, unit.toMillis(delay));
return super.schedule(callable, newDelayInMs, TimeUnit.MILLISECONDS);
}
@Override
public ScheduledFuture<?> scheduleAtFixedRate(Runnable command,
long initialDelay,
long period,
TimeUnit unit) {
final long newDelayInMs = beforeSchedule(command, unit.toMillis(initialDelay));
return super.scheduleAtFixedRate(command, newDelayInMs, unit.toMillis(period), TimeUnit.MILLISECONDS);
}
@Override
public ScheduledFuture<?> scheduleWithFixedDelay(Runnable command,
long initialDelay,
long period,
TimeUnit unit) {
final long newDelayInMs = beforeSchedule(command, unit.toMillis(initialDelay));
return super.scheduleWithFixedDelay(command, newDelayInMs, unit.toMillis(period), TimeUnit.MILLISECONDS);
}
@Override
protected void afterExecute(Runnable runnable, Throwable t) {
super.afterExecute(runnable, t);
try {
if (t == null && runnable instanceof Future<?>) {
try {
((Future<?>) runnable).get();
} catch (CancellationException | ExecutionException e) {
t = e;
} catch (InterruptedException ie) {
Thread.currentThread().interrupt(); // ignore/reset
}
}
if (t != null) {
System.err.println(t);
}
} finally {
releaseQueueUsage();
}
}
private long beforeSchedule(Runnable runnable, long delay) {
try {
return getQueuePermitAndModifiedDelay(delay);
} catch (InterruptedException e) {
getRejectedExecutionHandler().rejectedExecution(runnable, this);
return 0;
}
}
private long beforeSchedule(Callable callable, long delay) {
try {
return getQueuePermitAndModifiedDelay(delay);
} catch (InterruptedException e) {
getRejectedExecutionHandler().rejectedExecution(new FutureTask(callable), this);
return 0;
}
}
private long getQueuePermitAndModifiedDelay(long delay) throws InterruptedException {
final long beforeAcquireTimeStamp = System.currentTimeMillis();
maxQueueSize.tryAcquire(delay, TimeUnit.MILLISECONDS);
final long afterAcquireTimeStamp = System.currentTimeMillis();
return afterAcquireTimeStamp - beforeAcquireTimeStamp;
}
private void releaseQueueUsage() {
maxQueueSize.release();
}
}
我有代码here,将不胜感激任何反馈。 https://github.com/AmitabhAwasthi/BlockingScheduler
这是似乎非常有效的解决方案。它被称为NotifyingBlockingThreadPoolExecutor。
编辑:这段代码有问题,await() 方法有问题。调用 shutdown() + awaitTermination() 似乎工作正常。