21

ThreadPoolExecutor的 JavaDoc不清楚是否可以直接将任务添加到BlockingQueue后备执行程序。文档说调用executor.getQueue()“主要用于调试和监控”。

我正在ThreadPoolExecutor用我自己的BlockingQueue. 我保留了对队列的引用,因此可以直接向其中添加任务。返回相同的队列getQueue()所以我假设警告getQueue()适用于通过我的手段获得的支持队列的引用。

例子

代码的一般模式是:

int n = ...; // number of threads
queue = new ArrayBlockingQueue<Runnable>(queueSize);
executor = new ThreadPoolExecutor(n, n, 1, TimeUnit.HOURS, queue);
executor.prestartAllCoreThreads();
// ...
while (...) {
    Runnable job = ...;
    queue.offer(job, 1, TimeUnit.HOURS);
}
while (jobsOutstanding.get() != 0) {
    try {
        Thread.sleep(...);
    } catch (InterruptedException e) {
        Thread.currentThread().interrupt();
    }
}
executor.shutdownNow();

queue.offer()对比executor.execute()

据我了解,典型用途是通过executor.execute(). 我上面示例中的方法具有阻塞队列的好处,而execute()如果队列已满并拒绝我的任务,则会立即失败。我也喜欢提交作业与阻塞队列交互;这对我来说感觉更“纯粹”的生产者消费者。

直接将任务添加到队列的含义:我必须调用prestartAllCoreThreads()否则没有工作线程正在运行。假设没有与执行程序的其他交互,则不会监视队列(ThreadPoolExecutor来源检查证实了这一点)。这也意味着对于直接入队,ThreadPoolExecutor必须另外为 > 0 个核心线程进行配置,并且不得将其配置为允许核心线程超时。

tl;博士

给定一个ThreadPoolExecutor配置如下:

  • 核心线程 > 0
  • 核心线程不允许超时
  • 核心线程已预先启动
  • 持有对BlockingQueue支持执行者的引用

将任务直接添加到队列而不是调用是否可以接受executor.execute()

有关的

这个问题(生产者/消费者工作队列)类似,但不具体涉及直接添加到队列中。

4

5 回答 5

11

一个技巧是实现 ArrayBlockingQueue 的自定义子类并覆盖 offer() 方法来调用您的阻塞版本,然后您仍然可以使用正常的代码路径。

queue = new ArrayBlockingQueue<Runnable>(queueSize) {
  @Override public boolean offer(Runnable runnable) {
    try {
      return offer(runnable, 1, TimeUnit.HOURS);
    } catch(InterruptedException e) {
      // return interrupt status to caller
      Thread.currentThread().interrupt();
    }
    return false;
  }
};

(您可能会猜到,我认为直接在队列中调用 offer 作为您的正常代码路径可能是个坏主意)。

于 2011-04-07T19:32:28.500 回答
10

如果是我,我更喜欢使用Executor#execute()over Queue#offer(),因为我已经在使用其他所有东西java.util.concurrent了。

你的问题很好,它激起了我的兴趣,所以我看了一下来源ThreadPoolExecutor#execute()

public void execute(Runnable command) {
    if (command == null)
        throw new NullPointerException();
    if (poolSize >= corePoolSize || !addIfUnderCorePoolSize(command)) {
        if (runState == RUNNING && workQueue.offer(command)) {
            if (runState != RUNNING || poolSize == 0)
                ensureQueuedTaskHandled(command);
        }
        else if (!addIfUnderMaximumPoolSize(command))
            reject(command); // is shutdown or saturated
    }
}

我们可以看到 execute 本身调用offer()了工作队列,但在必要时进行了一些漂亮、美味的池操作之前不会。出于这个原因,我认为建议使用execute(); 不使用它可能(尽管我不确定)会导致池以非最佳方式运行。但是,我不认为 usingoffer()破坏执行器 - 看起来任务是使用以下(也来自 ThreadPoolExecutor)从队列中拉出的:

Runnable getTask() {
    for (;;) {
        try {
            int state = runState;
            if (state > SHUTDOWN)
                return null;
            Runnable r;
            if (state == SHUTDOWN)  // Help drain queue
                r = workQueue.poll();
            else if (poolSize > corePoolSize || allowCoreThreadTimeOut)
                r = workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS);
            else
                r = workQueue.take();
            if (r != null)
                return r;
            if (workerCanExit()) {
                if (runState >= SHUTDOWN) // Wake up others
                    interruptIdleWorkers();
                return null;
            }
            // Else retry
        } catch (InterruptedException ie) {
            // On interruption, re-check runState
        }
    }
}

这个getTask()方法只是在一个循环中调用,所以如果执行程序没有关闭,它会阻塞,直到一个新任务被分配给队列(不管它来自哪里)。

注意:即使我在这里发布了源代码片段,我们也不能依赖它们来获得明确的答案——我们应该只对 API 进行编码。我们不知道execute()随着时间的推移,实施将如何变化。

于 2011-04-07T18:53:38.200 回答
4

当队列已满时,可以通过指定RejectedExecutionHandlerat 实例化来实际配置池的行为。ThreadPoolExecutor将四个策略定义为内部类,包括AbortPolicy, DiscardOldestPolicy, DiscardPolicy,以及我个人最喜欢的CallerRunsPolicy,它在控制线程中运行新作业。

例如:

ThreadPoolExecutor threadPool = new ThreadPoolExecutor(
        nproc, // core size
        nproc, // max size
        60, // idle timeout
        TimeUnit.SECONDS,
        new ArrayBlockingQueue<Runnable>(4096, true), // Fairness = true guarantees FIFO
        new ThreadPoolExecutor.CallerRunsPolicy() ); // If we have to reject a task, run it in the calling thread.

可以使用以下方法获得问题中所需的行为:

public class BlockingPolicy implements RejectedExecutionHandler {
    void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
        executor.getQueue.put(r); // Self contained, no queue reference needed.
    }

在某些时候必须访问队列。这样做的最佳位置是在自包含的 中RejectedExecutionHandler,它可以保存任何代码重复或由于在池对象范围内直接操作队列而产生的潜在错误。请注意,自身包含的处理程序ThreadPoolExecutor使用getQueue().

于 2013-08-30T07:00:25.223 回答
2

如果您使用的队列是与标准内存LinkedBlockingQueueArrayBlockingQueue.

例如,如果您使用不同机器上的多个生产者来实现生产者-消费者模式,并使用基于单独的持久性子系统(如 Redis)的排队机制,那么即使您不这样做,问题本身也会变得相关想要offer()像 OP 这样的阻塞。

因此,prestartAllCoreThreads()必须调用(或足够多次prestartCoreThread())才能使工作线程可用和运行的给定答案非常重要,值得强调。

于 2013-10-25T11:48:45.107 回答
0

如果需要,我们还可以使用将主要处理与拒绝任务分开的停车场 -

    final CountDownLatch taskCounter = new CountDownLatch(TASKCOUNT);
    final List<Runnable> taskParking = new LinkedList<Runnable>();
    BlockingQueue<Runnable> taskPool = new ArrayBlockingQueue<Runnable>(1);
    RejectedExecutionHandler rejectionHandler = new RejectedExecutionHandler() {
        @Override
        public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
            System.err.println(Thread.currentThread().getName() + " -->rejection reported - adding to parking lot " + r);
            taskCounter.countDown();
            taskParking.add(r);
        }
    };
    ThreadPoolExecutor threadPoolExecutor =  new ThreadPoolExecutor(5, 10, 1000, TimeUnit.SECONDS, taskPool, rejectionHandler);
    for(int i=0 ; i<TASKCOUNT; i++){
        //main 
        threadPoolExecutor.submit(getRandomTask());
    }
    taskCounter.await(TASKCOUNT * 5 , TimeUnit.SECONDS);
    System.out.println("Checking the parking lot..." + taskParking);
    while(taskParking.size() > 0){
        Runnable r = taskParking.remove(0);
        System.out.println("Running from parking lot..." + r);
        if(taskParking.size() > LIMIT){
          waitForSometime(...);
        }
        threadPoolExecutor.submit(r);
    }
    threadPoolExecutor.shutdown();
于 2015-06-17T12:02:11.053 回答