91

我正在尝试编写一个解决方案,其中单个线程产生可以并行执行的 I/O 密集型任务。每个任务都有重要的内存数据。所以我希望能够限制当前待处理的任务数量。

如果我这样创建 ThreadPoolExecutor:

    ThreadPoolExecutor executor = new ThreadPoolExecutor(numWorkerThreads, numWorkerThreads,
                                  0L, TimeUnit.MILLISECONDS,
                                  new LinkedBlockingQueue<Runnable>(maxQueue));

然后当队列填满并且所有线程都已经忙时executor.submit(callable)抛出。RejectedExecutionException

executor.submit(callable)当队列已满且所有线程都忙时,我该怎么做才能阻塞?

编辑:我试过这个

executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());

它在某种程度上达到了我想要达到的效果,但是以一种不优雅的方式(基本上被拒绝的线程在调用线程中运行,所以这会阻止调用线程提交更多)。

编辑:(提出问题5年后)

对于阅读此问题及其答案的任何人,请不要将已接受的答案视为一种正确的解决方案。请通读所有答案和评论。

4

8 回答 8

66

我也做过同样的事情。诀窍是创建一个 BlockingQueue,其中 offer() 方法实际上是 put()。(你可以使用任何你想要的基本 BlockingQueue impl)。

public class LimitedQueue<E> extends LinkedBlockingQueue<E> 
{
    public LimitedQueue(int maxSize)
    {
        super(maxSize);
    }

    @Override
    public boolean offer(E e)
    {
        // turn offer() and add() into a blocking calls (unless interrupted)
        try {
            put(e);
            return true;
        } catch(InterruptedException ie) {
            Thread.currentThread().interrupt();
        }
        return false;
    }

}

请注意,这仅适用于线程池,corePoolSize==maxPoolSize因此在此处要小心(请参阅注释)。

于 2010-12-23T20:56:30.597 回答
16

当前接受的答案有一个潜在的重大问题 - 它改变了 ThreadPoolExecutor.execute 的行为,因此如果您有corePoolSize < maxPoolSize, ThreadPoolExecutor 逻辑将永远不会在核心之外添加额外的工作人员。

ThreadPoolExecutor .execute(Runnable):

    if (isRunning(c) && workQueue.offer(command)) {
        int recheck = ctl.get();
        if (! isRunning(recheck) && remove(command))
            reject(command);
        else if (workerCountOf(recheck) == 0)
            addWorker(null, false);
    }
    else if (!addWorker(command, false))
        reject(command);

具体来说,最后一个“其他”块将永远不会被击中。

一个更好的选择是做一些类似于 OP 已经在做的事情 - 使用RejectedExecutionHandler来做同样的put逻辑:

public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
    try {
        if (!executor.isShutdown()) {
            executor.getQueue().put(r);
        }
    } catch (InterruptedException e) {
        Thread.currentThread().interrupt();
        throw new RejectedExecutionException("Executor was interrupted while the task was waiting to put on work queue", e);
    }
}

正如评论中指出的那样(参考这个答案),这种方法有一些需要注意的地方:

  1. 如果corePoolSize==0,则存在竞争条件,池中的所有线程都可能在任务可见之前死亡
  2. 使用包装队列任务的实现(不适用于ThreadPoolExecutor)将导致问题,除非处理程序也以相同的方式包装它。

记住这些问题,这个解决方案将适用于大多数典型的 ThreadPoolExecutors,并且可以正确处理corePoolSize < maxPoolSize.

于 2015-08-20T16:27:04.323 回答
15

这是我最终解决这个问题的方法:

(注意:此解决方案确实阻止了提交 Callable 的线程,因此它可以防止 RejectedExecutionException 被抛出)

public class BoundedExecutor extends ThreadPoolExecutor{

    private final Semaphore semaphore;

    public BoundedExecutor(int bound) {
        super(bound, Integer.MAX_VALUE, 60L, TimeUnit.SECONDS, new SynchronousQueue<Runnable>());
        semaphore = new Semaphore(bound);
    }

    /**Submits task to execution pool, but blocks while number of running threads 
     * has reached the bound limit
     */
    public <T> Future<T> submitButBlockIfFull(final Callable<T> task) throws InterruptedException{

        semaphore.acquire();            
        return submit(task);                    
    }


    @Override
    protected void afterExecute(Runnable r, Throwable t) {
        super.afterExecute(r, t);

        semaphore.release();
    }
}
于 2014-06-26T00:45:25.540 回答
5

我知道这是一个老问题,但有一个类似的问题,即创建新任务非常快,如果因为现有任务完成速度不够快而发生太多 OutOfMemoryError。

在我的情况下Callables,我需要结果,因此我需要存储所有Futures返回的executor.submit(). 我的解决方案是将其Futures放入BlockingQueue最大尺寸的 a 中。一旦该队列已满,在某些任务完成(从队列中删除元素)之前不会再生成任务。在伪代码中:

final ExecutorService executor = Executors.newFixedThreadPool(numWorkerThreads);
final LinkedBlockingQueue<Future> futures = new LinkedBlockingQueue<>(maxQueueSize);
try {   
    Thread taskGenerator = new Thread() {
        @Override
        public void run() {
            while (reader.hasNext) {
                Callable task = generateTask(reader.next());
                Future future = executor.submit(task);
                try {
                    // if queue is full blocks until a task
                    // is completed and hence no future tasks are submitted.
                    futures.put(future);
                } catch (InterruptedException ex) {
                    Thread.currentThread().interrupt();         
                }
            }
        executor.shutdown();
        }
    }
    taskGenerator.start();
    
    // read from queue as long as task are being generated
    // or while Queue has elements in it
    while (taskGenerator.isAlive()
                    || !futures.isEmpty()) {
        Future future = futures.take();
        // do something
    }
} catch (InterruptedException ex) {
    Thread.currentThread().interrupt();     
} catch (ExecutionException ex) {
    throw new MyException(ex);
} finally {
    executor.shutdownNow();
}
于 2013-07-17T11:23:23.297 回答
5

CallerBlocksPolicy如果您使用的是spring-integration,如何使用该类?

此类实现RejectedExecutionHandler接口,该接口是一个处理程序,用于处理无法由ThreadPoolExecutor.

您可以像这样使用此策略。

executor.setRejectedExecutionHandler(new CallerBlocksPolicy());

CallerBlocksPolicy和之间的主要区别在于CallerRunsPolicy它是在调用者线程中阻塞还是运行任务。

请参考此代码

于 2021-02-04T19:28:55.580 回答
2

我遇到了类似的问题,我通过使用来自以下的beforeExecute/afterExecute钩子来实现它ThreadPoolExecutor

import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.ReentrantLock;

/**
 * Blocks current task execution if there is not enough resources for it.
 * Maximum task count usage controlled by maxTaskCount property.
 */
public class BlockingThreadPoolExecutor extends ThreadPoolExecutor {

    private final ReentrantLock taskLock = new ReentrantLock();
    private final Condition unpaused = taskLock.newCondition();
    private final int maxTaskCount;

    private volatile int currentTaskCount;

    public BlockingThreadPoolExecutor(int corePoolSize, int maximumPoolSize,
            long keepAliveTime, TimeUnit unit,
            BlockingQueue<Runnable> workQueue, int maxTaskCount) {
        super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue);
        this.maxTaskCount = maxTaskCount;
    }

    /**
     * Executes task if there is enough system resources for it. Otherwise
     * waits.
     */
    @Override
    protected void beforeExecute(Thread t, Runnable r) {
        super.beforeExecute(t, r);
        taskLock.lock();
        try {
            // Spin while we will not have enough capacity for this job
            while (maxTaskCount < currentTaskCount) {
                try {
                    unpaused.await();
                } catch (InterruptedException e) {
                    t.interrupt();
                }
            }
            currentTaskCount++;
        } finally {
            taskLock.unlock();
        }
    }

    /**
     * Signalling that one more task is welcome
     */
    @Override
    protected void afterExecute(Runnable r, Throwable t) {
        super.afterExecute(r, t);
        taskLock.lock();
        try {
            currentTaskCount--;
            unpaused.signalAll();
        } finally {
            taskLock.unlock();
        }
    }
}

这对你来说应该足够好了。顺便说一句,最初的实现是基于任务大小的,因为一个任务可能比另一个任务大 100 倍,并且提交两个巨大的任务正在杀死盒子,但是运行一个大任务和大量小任务是可以的。如果您的 I/O 密集型任务的大小大致相同,您可以使用此类,否则请告诉我,我将发布基于大小的实现。

PS你会想检查ThreadPoolExecutorjavadoc。Doug Lea 提供的关于如何轻松定制的非常好的用户指南。

于 2010-12-24T02:25:20.003 回答
2

我已经按照装饰器模式实现了一个解决方案,并使用信号量来控制执行任务的数量。您可以将它与任何Executor和:

  • 指定正在进行的任务的最大值
  • 指定等待任务执行permit的最大超时时间(如果超时没有获取permit,RejectedExecutionException则抛出a)
import static java.util.concurrent.TimeUnit.MILLISECONDS;

import java.time.Duration;
import java.util.Objects;
import java.util.concurrent.Executor;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.Semaphore;

import javax.annotation.Nonnull;

public class BlockingOnFullQueueExecutorDecorator implements Executor {

    private static final class PermitReleasingDecorator implements Runnable {

        @Nonnull
        private final Runnable delegate;

        @Nonnull
        private final Semaphore semaphore;

        private PermitReleasingDecorator(@Nonnull final Runnable task, @Nonnull final Semaphore semaphoreToRelease) {
            this.delegate = task;
            this.semaphore = semaphoreToRelease;
        }

        @Override
        public void run() {
            try {
                this.delegate.run();
            }
            finally {
                // however execution goes, release permit for next task
                this.semaphore.release();
            }
        }

        @Override
        public final String toString() {
            return String.format("%s[delegate='%s']", getClass().getSimpleName(), this.delegate);
        }
    }

    @Nonnull
    private final Semaphore taskLimit;

    @Nonnull
    private final Duration timeout;

    @Nonnull
    private final Executor delegate;

    public BlockingOnFullQueueExecutorDecorator(@Nonnull final Executor executor, final int maximumTaskNumber, @Nonnull final Duration maximumTimeout) {
        this.delegate = Objects.requireNonNull(executor, "'executor' must not be null");
        if (maximumTaskNumber < 1) {
            throw new IllegalArgumentException(String.format("At least one task must be permitted, not '%d'", maximumTaskNumber));
        }
        this.timeout = Objects.requireNonNull(maximumTimeout, "'maximumTimeout' must not be null");
        if (this.timeout.isNegative()) {
            throw new IllegalArgumentException("'maximumTimeout' must not be negative");
        }
        this.taskLimit = new Semaphore(maximumTaskNumber);
    }

    @Override
    public final void execute(final Runnable command) {
        Objects.requireNonNull(command, "'command' must not be null");
        try {
            // attempt to acquire permit for task execution
            if (!this.taskLimit.tryAcquire(this.timeout.toMillis(), MILLISECONDS)) {
                throw new RejectedExecutionException(String.format("Executor '%s' busy", this.delegate));
            }
        }
        catch (final InterruptedException e) {
            // restore interrupt status
            Thread.currentThread().interrupt();
            throw new IllegalStateException(e);
        }

        this.delegate.execute(new PermitReleasingDecorator(command, this.taskLimit));
    }

    @Override
    public final String toString() {
        return String.format("%s[availablePermits='%s',timeout='%s',delegate='%s']", getClass().getSimpleName(), this.taskLimit.availablePermits(),
                this.timeout, this.delegate);
    }
}
于 2018-03-19T21:53:21.997 回答
1

I think it is as simple as using a ArrayBlockingQueue instead of a a LinkedBlockingQueue.

Ignore me... that's totally wrong. ThreadPoolExecutor calls Queue#offer not put which would have the effect you require.

You could extend ThreadPoolExecutor and provide an implementation of execute(Runnable) that calls put in place of offer.

That doesn't seem like a completely satisfactory answer I'm afraid.

于 2010-12-23T19:59:48.630 回答