38

为什么,哦,为什么不java.util.concurrent为其ExecutorServices 提供队列长度指示器?最近我发现自己在做这样的事情:

ExecutorService queue = Executors.newSingleThreadExecutor();
AtomicInteger queueLength = new AtomicInteger();
...

public void addTaskToQueue(Runnable runnable) {
    if (queueLength.get() < MAX_QUEUE_LENGTH) {
        queueLength.incrementAndGet(); // Increment queue when submitting task.
        queue.submit(new Runnable() {
            public void run() {
                runnable.run();
                queueLength.decrementAndGet(); // Decrement queue when task done.
            }
        });
    } else {
        // Trigger error: too long queue
    }
}

哪个工作正常,但是...我认为这确实应该作为ExecutorService. 携带与实际队列分离的计数器是愚蠢且容易出错的,计数器应该指示其长度(让我想起 C 数组)。但是,ExecutorServices 是通过静态工厂方法获得的,因此无法简单地扩展原本出色的单线程执行器并添加队列计数器。所以我该怎么做:

  1. 重新发明已经在 J​​DK 中实现的东西?
  2. 其他聪明的解决方案?
4

2 回答 2

65

还有一种更直接的方法:

ThreadPoolExecutor executor = (ThreadPoolExecutor) Executors.newSingleThreadExecutor();
// add jobs
// ...
int size = executor.getQueue().size();

尽管您可能会考虑不使用 Executor 的便捷创建方法,而是直接创建 executor 以摆脱强制转换,从而确保 executor 实际上始终是 a ThreadPoolExecutor,即使Executors.newSingleThreadExecutor某天的实现会发生变化。

ThreadPoolExecutor executor = new ThreadPoolExecutor( 1, 1, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>() );

这是直接从Executors.newSingleThreadExecutorJDK 1.6 中复制而来的。LinkedBlockingQueue传递给构造函数的实际上就是您将从中返回的对象getQueue

于 2010-02-15T14:08:40.260 回答
4

虽然您可以直接检查队列大小。处理太长的队列的另一种方法是使内部队列有界。

public static
ExecutorService newFixedThreadPoolWithQueueSize(int nThreads, int queueSize) {
  return new ThreadPoolExecutor(nThreads, nThreads,
                              5000L, TimeUnit.MILLISECONDS,
                              new ArrayBlockingQueue<Runnable>(queueSize, true));
}

当您超过限制时,这将导致 RejectedExecutionExceptions(请参阅此)。

如果要避免异常,可以劫持调用线程来执行该函数。请参阅此 SO question以获取解决方案。

参考

于 2016-12-06T20:40:56.337 回答