69

我正在尝试使用 ThreadPoolExecutor 执行大量任务。下面是一个假设的例子:

def workQueue = new ArrayBlockingQueue<Runnable>(3, false)
def threadPoolExecutor = new ThreadPoolExecutor(3, 3, 1L, TimeUnit.HOURS, workQueue)
for(int i = 0; i < 100000; i++)
    threadPoolExecutor.execute(runnable)

问题是我很快就得到了一个,java.util.concurrent.RejectedExecutionException因为任务的数量超过了工作队列的大小。但是,我正在寻找的期望行为是让主线程阻塞,直到队列中有空间。实现这一目标的最佳方法是什么?

4

8 回答 8

76

在一些非常狭窄的情况下,您可以实现一个 java.util.concurrent.RejectedExecutionHandler 来满足您的需要。

RejectedExecutionHandler block = new RejectedExecutionHandler() {
  rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
     executor.getQueue().put( r );
  }
};

ThreadPoolExecutor pool = new ...
pool.setRejectedExecutionHandler(block);

现在。这是一个非常糟糕的主意,原因如下

  • 它很容易出现死锁,因为池中的所有线程都可能在您放入队列中的东西可见之前就死了。通过设置合理的保持活动时间来缓解这种情况。
  • 任务没有按照您的 Executor 期望的方式包装。许多执行器实现在执行之前将它们的任务包装在某种跟踪对象中。看你的来源。
  • API 强烈反对通过 getQueue() 添加,并且在某些时候可能会被禁止。

一个几乎总是更好的策略是安装 ThreadPoolExecutor.CallerRunsPolicy,它将通过在调用 execute() 的线程上运行任务来限制您的应用程序。

但是,有时,具有所有固有风险的阻止策略确实是您想要的。我会说在这些条件下

  • 你只有一个线程调用 execute()
  • 你必须(或想要)有一个非常小的队列长度
  • 您绝对需要限制运行此工作的线程数(通常是出于外部原因),而调用者运行策略会破坏这一点。
  • 您的任务大小不可预测,因此如果池暂时忙于 4 个短任务并且您的一个线程调用 execute 被一个大任务卡住,调用者运行可能会引入饥饿。

所以,正如我所说。它很少需要并且可能很危险,但是你去。

祝你好运。

于 2010-08-19T03:47:45.967 回答
17

您需要做的是将 ThreadPoolExecutor 包装到 Executor 中,这明确限制了其中并发执行的操作数量:

 private static class BlockingExecutor implements Executor {

    final Semaphore semaphore;
    final Executor delegate;

    private BlockingExecutor(final int concurrentTasksLimit, final Executor delegate) {
        semaphore = new Semaphore(concurrentTasksLimit);
        this.delegate = delegate;
    }

    @Override
    public void execute(final Runnable command) {
        try {
            semaphore.acquire();
        } catch (InterruptedException e) {
            e.printStackTrace();
            return;
        }

        final Runnable wrapped = () -> {
            try {
                command.run();
            } finally {
                semaphore.release();
            }
        };

        delegate.execute(wrapped);

    }
}

您可以将 concurrentTasksLimit 调整为委托执行程序的 threadPoolSize + queueSize,它几乎可以解决您的问题

于 2018-05-16T01:42:10.057 回答
8

您可以使用 asemaphore来阻止线程进入池。

ExecutorService service = new ThreadPoolExecutor(
    3, 
    3, 
    1, 
    TimeUnit.HOURS, 
    new ArrayBlockingQueue<>(6, false)
);

Semaphore lock = new Semaphore(6); // equal to queue capacity

for (int i = 0; i < 100000; i++ ) {
    try {
        lock.acquire();
        service.submit(() -> {
            try {
              task.run();
            } finally {
              lock.release();
            }
        });
    } catch (InterruptedException e) {
        throw new RuntimeException(e);
    }
}

一些陷阱

  • 仅将此模式与固定线程池一起使用。队列不太可能经常被填满,因此不会创建新线程。查看 ThreadPoolExecutor 上的 java 文档以获取更多详细信息:https ://docs.oracle.com/javase/8/docs/api/java/util/concurrent/ThreadPoolExecutor.html有一种解决方法,但它已经超出了这个答案的范围。
  • 队列大小应高于核心线程数。如果我们将队列大小设为 3,最终会发生什么:

    • T0:三个线程都在工作,队列为空,没有可用的许可。
    • T1:线程 1 结束,释放许可。
    • T2:线程 1 轮询队列寻找新工作,没有找到,然后等待
    • T3:主线程将工作提交到池中,线程1开始工作。

    上面的例子转换为线程主线程阻塞线程 1。它可能看起来像一个小周期,但现在将频率乘以天和月。突然之间,短时间加起来浪费了大量时间。

于 2018-03-26T00:28:21.310 回答
6

这就是我最终做的事情:

int NUM_THREADS = 6;
Semaphore lock = new Semaphore(NUM_THREADS);
ExecutorService pool = Executors.newCachedThreadPool();

for (int i = 0; i < 100000; i++) {
    try {
        lock.acquire();
    } catch (InterruptedException e) {
        throw new RuntimeException(e);
    }
    pool.execute(() -> {
        try {
            // Task logic
        } finally {
            lock.release();
        }
    });
}
于 2018-05-29T21:04:12.727 回答
2

一个相当简单的选择是用一个在被调用时调用BlockingQueue的实现来包装你:put(..)offer(..)

public class BlockOnOfferAdapter<T> implements BlockingQueue<T> {

(..)

  public boolean offer(E o) {
        try {
            delegate.put(o);
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
            return false;
        }
        return true;
  }

(.. implement all other methods simply by delegating ..)

}

这是有效的,因为默认情况下put(..)会等到队列中的容量已满时,请参阅

    /**
     * Inserts the specified element into this queue, waiting if necessary
     * for space to become available.
     *
     * @param e the element to add
     * @throws InterruptedException if interrupted while waiting
     * @throws ClassCastException if the class of the specified element
     *         prevents it from being added to this queue
     * @throws NullPointerException if the specified element is null
     * @throws IllegalArgumentException if some property of the specified
     *         element prevents it from being added to this queue
     */
    void put(E e) throws InterruptedException;

无需捕获RejectedExecutionException或复杂的锁定。

于 2019-11-05T09:34:17.270 回答
1

这是我在这种情况下的代码片段:

public void executeBlocking( Runnable command ) {
    if ( threadPool == null ) {
        logger.error( "Thread pool '{}' not initialized.", threadPoolName );
        return;
    }
    ThreadPool threadPoolMonitor = this;
    boolean accepted = false;
    do {
        try {
            threadPool.execute( new Runnable() {
                @Override
                public void run() {
                    try {
                        command.run();
                    }
                    // to make sure that the monitor is freed on exit
                    finally {
                        // Notify all the threads waiting for the resource, if any.
                        synchronized ( threadPoolMonitor ) {
                            threadPoolMonitor.notifyAll();
                        }
                    }
                }
            } );
            accepted = true;
        }
        catch ( RejectedExecutionException e ) {
            // Thread pool is full
            try {
                // Block until one of the threads finishes its job and exits.
                synchronized ( threadPoolMonitor ) {
                    threadPoolMonitor.wait();
                }
            }
            catch ( InterruptedException ignored ) {
                // return immediately
                break;
            }
        }
    } while ( !accepted );
}

threadPool 是已经初始化的 java.util.concurrent.ExecutorService 的本地实例。

于 2017-05-31T08:11:54.577 回答
0

好的,旧线程,但这是我在搜索阻塞线程执行程序时发现的。当任务提交到任务队列时,我的代码尝试获取信号量。如果没有剩余信号量,这将阻塞。一旦任务完成,装饰器就会释放信号量。可怕的部分是可能会丢失信号量,但这可以通过例如定时作业来解决,它只是在定时的基础上清除信号量。

所以这里是我的解决方案:

class BlockingThreadPoolTaskExecutor(concurrency: Int) : ThreadPoolTaskExecutor() {
    companion object {
        lateinit var semaphore: Semaphore
    }

    init {
        semaphore = Semaphore(concurrency)
        val semaphoreTaskDecorator = SemaphoreTaskDecorator()
        this.setTaskDecorator(semaphoreTaskDecorator)
    }

    override fun <T> submit(task: Callable<T>): Future<T> {
        log.debug("submit")
        semaphore.acquire()
        return super.submit(task)
    }
}

private class SemaphoreTaskDecorator : TaskDecorator {
    override fun decorate(runnable: Runnable): Runnable {
        log.debug("decorate")
        return Runnable {
            try {
                runnable.run()
            } finally {
                log.debug("decorate done")
                semaphore.release()
            }
        }
    }
}
于 2020-04-23T14:37:38.537 回答
0

我使用自定义的 RejectedExecutionHandler 解决了这个问题,它只是将调用线程阻塞了一会儿,然后再次尝试提交任务:

public class BlockWhenQueueFull implements RejectedExecutionHandler {

    public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {

        // The pool is full. Wait, then try again.
        try {
            long waitMs = 250;
            Thread.sleep(waitMs);
        } catch (InterruptedException interruptedException) {}

        executor.execute(r);
    }
}

此类只能在线程池执行程序中用作 RejectedExecutionHandler ,就像任何其他类一样。在这个例子中:

executorPool = new def threadPoolExecutor = new ThreadPoolExecutor(3, 3, 1L, TimeUnit.HOURS, workQueue, new BlockWhenQueueFull())

我看到的唯一缺点是调用线程可能会被锁定比严格必要的时间稍长(最多 250 毫秒)。对于许多短期运行的任务,可能会将等待时间减少到 10 毫秒左右。此外,由于此执行程序实际上是递归调用的,因此等待线程变得可用的时间很长(数小时)可能会导致堆栈溢出。

不过,我个人喜欢这种方法。它结构紧凑、易于理解且运行良好。我错过了什么重要的东西吗?

于 2017-11-06T22:08:41.490 回答