64

我不能使用shutdown()awaitTermination()因为它可能会在等待时将新任务添加到 ThreadPoolExecutor 中。

所以我正在寻找一种方法来等到 ThreadPoolExecutor 清空它的队列并完成它的所有任务,而不停止在此之前添加新任务。

如果它有任何区别,这适用于Android。

谢谢

更新:很多周后重新访问后,我发现修改后的 CountDownLatch 在这种情况下对我来说效果更好。我会保留答案,因为它更适用于我的要求。

4

7 回答 7

74

如果您想知道某项任务或某批任务何时完成,您可以使用ExecutorService.submit(Runnable). 调用此方法会返回一个Future对象,该对象可能会被放入一个对象中,Collection然后您的主线程将迭代调用Future.get()每个对象。这将导致您的主线程停止执行,直到ExecutorService处理完所有Runnable任务。

Collection<Future<?>> futures = new LinkedList<Future<?>>();
futures.add(executorService.submit(myRunnable));
for (Future<?> future:futures) {
    future.get();
}
于 2010-10-14T02:00:37.020 回答
8

My Scenario 是一个网络爬虫,用于从网站获取一些信息,然后对其进行处理。一个 ThreadPoolExecutor 用于加快进程,因为可以同时加载许多页面。因此,新任务将在现有任务中创建,因为爬虫将跟踪每个页面中的超链接。问题是一样的:主线程不知道什么时候所有的任务都完成了,它可以开始处理结果。我使用一种简单的方法来确定这一点。它不是很优雅,但在我的情况下有效:

while (executor.getTaskCount()!=executor.getCompletedTaskCount()){
    System.err.println("count="+executor.getTaskCount()+","+executor.getCompletedTaskCount());
    Thread.sleep(5000);
}
executor.shutdown();
executor.awaitTermination(60, TimeUnit.SECONDS);
于 2012-02-23T01:42:41.780 回答
6

也许您正在寻找CompletionService来管理批量任务,另请参阅此答案

于 2010-10-14T02:04:17.253 回答
3

(这是尝试通过我自己的调整来重现 Thilo 之前删除的答案。)

我认为你可能需要澄清你的问题,因为有一个隐含的无限条件......在某些时候你必须决定关闭你的执行者,那时它不会接受任何更多的任务。您的问题似乎暗示您想等到您知道不会提交进一步的任务,您只能在自己的应用程序代码中知道。

以下答案将允许您顺利过渡到新 TPE(无论出于何种原因),完成所有当前提交的任务,并且不会拒绝新 TPE 的新任务。它可能会回答你的问题。@Thilo 也可能。

假设您在某处定义了一个可见的 TPE,如下所示:

AtomicReference<ThreadPoolExecutor> publiclyAvailableTPE = ...;

然后,您可以这样编写 TPE 交换例程。也可以使用同步方法编写,但我认为这更简单:

void rotateTPE()
{
   ThreadPoolExecutor newTPE = createNewTPE();
   // atomic swap with publicly-visible TPE
   ThreadPoolExecutor oldTPE = publiclyAvailableTPE.getAndSet(newTPE);
   oldTPE.shutdown();

   // and if you want this method to block awaiting completion of old tasks in  
   // the previously visible TPE
   oldTPE.awaitTermination();
} 

或者,如果您真的不想杀死线程池,那么您的提交者端将需要在某些时候处理被拒绝的任务,您可以使用null新的 TPE:

void killTPE()
{
   ThreadPoolExecutor oldTPE = publiclyAvailableTPE.getAndSet(null);
   oldTPE.shutdown();

   // and if you want this method to block awaiting completion of old tasks in  
   // the previously visible TPE
   oldTPE.awaitTermination();
} 

这可能会导致上游问题,调用者需要知道如何处理null.

您也可以用一个简单地拒绝每个新执行的虚拟 TPE 进行交换,但这相当于调用shutdown()TPE 时发生的情况。

于 2010-10-14T12:45:19.333 回答
1

如果您不想使用shutdown,请遵循以下方法:

  1. 遍历Future从提交开始的所有任务,并按照建议检查对象ExecutorService阻塞调用的get()状态FutureTim Bender

  2. 使用其中之一

    1. 使用invokeAll _ExecutorService
    2. 使用CountDownLatch
    3. 使用ForkJoinPoolnewWorkStealingPoolExecutors从 java 8 开始)

invokeAll()on executor service 也达到了同样的目的CountDownLatch

相关的 SE 问题:

如何等待多个线程完成?

于 2016-04-19T12:21:19.400 回答
0

您可以在Runner类上调用waitTillDone() :

Runner runner = Runner.runner(10);

runner.runIn(2, SECONDS, runnable);
runner.run(runnable); // each of this runnables could submit more tasks

runner.waitTillDone(); // blocks until all tasks are finished (or failed)

// and now reuse it

runner.runIn(500, MILLISECONDS, callable);

runner.waitTillDone();
runner.shutdown();

要使用它,请将此 gradle/maven 依赖项添加到您的项目中:'com.github.matejtymes:javafixes:1.0'

有关更多详细信息,请查看此处:https ://github.com/MatejTymes/JavaFixes或此处: http: //matejtymes.blogspot.com/2016/04/executor-that-notifies-you-when-task.html

于 2016-04-29T01:32:48.337 回答
0

尝试使用队列大小和活动任务计数,如下所示

 while (executor.getThreadPoolExecutor().getActiveCount() != 0 || !executor.getThreadPoolExecutor().getQueue().isEmpty()){
                     try {
                Thread.sleep(500);
            } catch (InterruptedException e) {
            }
        }
于 2019-08-16T14:28:44.757 回答