3

我有下面的代码片段,运行良好。但问题是它立即创建并将超过 2000 个任务放在执行程序队列中。

我需要检查执行程序队列中已经完成的任务是否完成,然后再给它更多任务。不一定要准确,即如果队列剩余的任务少于 10 个,则再添加 50 个。

所以executor任务队列没有那么多待处理的任务,这样也可以让shutdown()及时工作,否则即使被调用,executor仍然会尝试先完成其队列中的所有2000个任务。

实现这一目标的最佳方法是什么?谢谢你

executor = Executors.newFixedThreadPool(numThreads);

while(some_condition==true)
{
    //if(executor < 10 tasks pending)  <---- how do i do this?
    //{                             
        for(int k=0;k<20;k++)
        {  
            Runnable worker = new MyRunnable();
            executor.execute(worker);
        }
    //}
    //else 
    //{
    //      wait(3000);
    //}
} 

使用信号量更新:

private final Semaphore semaphore = new Semaphore(10)
executor = new ThreadPoolExecutorWithSemaphoreFromJohnExample();

while(some_condition==true)
{

        Runnable worker = new MyRunnable();
        //So at this point if semaphore is full, then while loop would PAUSE(??) until
        //semaphore frees up again.
          executor.execute(worker);   
} 
4

4 回答 4

8

我有下面的代码片段,运行良好。但问题是它立即创建并将超过 2000 个任务放在执行程序队列中。

一种方法是ThreadPoolExecutor使用有限的作业队列创建您自己的并RejectedExecutionHandler在其上设置自定义。这使您可以细粒度地控制要排队的作业数量。

您需要自定义处理程序,因为默认情况下,如果队列已满,ThreadPoolExecutor.submit(...)则会抛出RejectedExecutionException. 使用下面的自定义处理程序,当它被队列拒绝时,拒绝处理程序只是将其放回原处,阻塞直到队列有空间。所以没有工作会被拒绝/放弃。

下面大致介绍了如何启动自己的线程池并设置自己的拒绝处理程序。

// you can tune the blocking queue size which is the number of jobs to queue
// when the NUM_THREADS are all working
final BlockingQueue<MyRunnable> queue =
    new ArrayBlockingQueue<MyRunnable>(NUM_JOBS_TO_QUEUE);
ThreadPoolExecutor threadPool = new ThreadPoolExecutor(NUM_THREADS, NUM_THREADS,
       0L, TimeUnit.MILLISECONDS, queue);
// by default (unfortunately) the ThreadPoolExecutor will throw an exception
// when you submit the job that fills the queue, to have it block you do:
threadPool.setRejectedExecutionHandler(new RejectedExecutionHandler() {
   public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
      // this will block if the queue is full as opposed to throwing
      executor.getQueue().put(r);
   }
});
...
// now submit all of your jobs and it will block if the queue is full
for(int k = 0; k < 20000000; k++) {  
   Runnable worker = new MyRunnable();
   threadPool.execute(worker);
}

有关阻塞线程池的更多详细信息,请参见我的答案:

如果需要处理的数据太多,如何让 ThreadPoolExecutor 命令等待?

您还可以使用ThreadPoolExecutor.CallerRunsPolicywhich 会导致将作业提交到线程池的调用者执行作业。但是我不喜欢这种解决方案,因为它会阻塞调用者,直到作业完成,这可能会使其他工作线程饿死。此外,如果有多个提交者,可能仍会导致运行作业的线程过多。

最后,请注意我将核心和最大线程数设置ThreadPoolExecutor为相同的数字。不幸的是,默认情况下,执行程序会启动核心线程,然后填充队列,然后才会分配额外的线程,直到达到最大值。这完全违反直觉。

于 2012-07-19T20:32:39.397 回答
7

您可以使用简单的信号量。提交后获得新的许可证,完成后释放许可证,以允许其他等待提交的人。

private final Semaphore semaphore = new Semaphore(10);//or however you want max queued at any given moment
ThreadPoolExecutor tp= new ThreadPoolExecutor(...){
      public void execute(Runnable r){
          semaphore.acquire();
          super.execute(r);
      }    
      public void afterExecute(Runnable r, Thread t){
         semaphore.release();  
         super.afterExecute(r,t);
      }
};

因此,如果没有更多可用的许可,这里的提交线程将被暂停。

于 2012-07-19T21:04:10.387 回答
3

我通常通过为任务对象使用对象“池队列”来限制此类系统 - 一个在启动时填充了 X 个任务的 BlockingQueue。任何想要向线程提交任务的东西都必须从池队列中获取一个,用数据加载它,然后提交它。

当任务完成并导致它被处理时,它会被推回池队列以供重用。

如果池为空,提交线程会阻塞在池队列中,直到返回一些任务。

这本质上是@John Vint 所建议的信号量控制的一种形式,但具有一些进一步的优点 - 例如,无需持续创建/GC 可运行对象。我喜欢将 PooolQueue.size 转储到计时器上的 GUI 状态栏,这样我就可以看到系统有多“忙”(并且还可以快速检测任何对象泄漏:)

于 2012-07-20T07:55:46.597 回答
2

你会更好地设置拒绝策略,因为你不想压倒线程池,我发现在不让自己复杂化的情况下完成此操作的最佳方法是执行以下操作:

final ThreadPoolExecutor executor=(ThreadPoolExecutor)Executors.newFixedThreadPool(THREADS_COUNT);
executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());

会发生的是,一旦所有线程都忙,调用者的线程将执行任务。这是对此类策略CallerRunsPolicy JavaDoc的参考

于 2013-07-29T15:28:47.537 回答