我不能使用shutdown()
,awaitTermination()
因为它可能会在等待时将新任务添加到 ThreadPoolExecutor 中。
所以我正在寻找一种方法来等到 ThreadPoolExecutor 清空它的队列并完成它的所有任务,而不停止在此之前添加新任务。
如果它有任何区别,这适用于Android。
谢谢
更新:很多周后重新访问后,我发现修改后的 CountDownLatch 在这种情况下对我来说效果更好。我会保留答案,因为它更适用于我的要求。
我不能使用shutdown()
,awaitTermination()
因为它可能会在等待时将新任务添加到 ThreadPoolExecutor 中。
所以我正在寻找一种方法来等到 ThreadPoolExecutor 清空它的队列并完成它的所有任务,而不停止在此之前添加新任务。
如果它有任何区别,这适用于Android。
谢谢
更新:很多周后重新访问后,我发现修改后的 CountDownLatch 在这种情况下对我来说效果更好。我会保留答案,因为它更适用于我的要求。
如果您想知道某项任务或某批任务何时完成,您可以使用ExecutorService.submit(Runnable)
. 调用此方法会返回一个Future
对象,该对象可能会被放入一个对象中,Collection
然后您的主线程将迭代调用Future.get()
每个对象。这将导致您的主线程停止执行,直到ExecutorService
处理完所有Runnable
任务。
Collection<Future<?>> futures = new LinkedList<Future<?>>();
futures.add(executorService.submit(myRunnable));
for (Future<?> future:futures) {
future.get();
}
My Scenario 是一个网络爬虫,用于从网站获取一些信息,然后对其进行处理。一个 ThreadPoolExecutor 用于加快进程,因为可以同时加载许多页面。因此,新任务将在现有任务中创建,因为爬虫将跟踪每个页面中的超链接。问题是一样的:主线程不知道什么时候所有的任务都完成了,它可以开始处理结果。我使用一种简单的方法来确定这一点。它不是很优雅,但在我的情况下有效:
while (executor.getTaskCount()!=executor.getCompletedTaskCount()){
System.err.println("count="+executor.getTaskCount()+","+executor.getCompletedTaskCount());
Thread.sleep(5000);
}
executor.shutdown();
executor.awaitTermination(60, TimeUnit.SECONDS);
也许您正在寻找CompletionService来管理批量任务,另请参阅此答案。
(这是尝试通过我自己的调整来重现 Thilo 之前删除的答案。)
我认为你可能需要澄清你的问题,因为有一个隐含的无限条件......在某些时候你必须决定关闭你的执行者,那时它不会接受任何更多的任务。您的问题似乎暗示您想等到您知道不会提交进一步的任务,您只能在自己的应用程序代码中知道。
以下答案将允许您顺利过渡到新 TPE(无论出于何种原因),完成所有当前提交的任务,并且不会拒绝新 TPE 的新任务。它可能会回答你的问题。@Thilo 也可能。
假设您在某处定义了一个可见的 TPE,如下所示:
AtomicReference<ThreadPoolExecutor> publiclyAvailableTPE = ...;
然后,您可以这样编写 TPE 交换例程。也可以使用同步方法编写,但我认为这更简单:
void rotateTPE()
{
ThreadPoolExecutor newTPE = createNewTPE();
// atomic swap with publicly-visible TPE
ThreadPoolExecutor oldTPE = publiclyAvailableTPE.getAndSet(newTPE);
oldTPE.shutdown();
// and if you want this method to block awaiting completion of old tasks in
// the previously visible TPE
oldTPE.awaitTermination();
}
或者,如果您真的不想杀死线程池,那么您的提交者端将需要在某些时候处理被拒绝的任务,您可以使用null
新的 TPE:
void killTPE()
{
ThreadPoolExecutor oldTPE = publiclyAvailableTPE.getAndSet(null);
oldTPE.shutdown();
// and if you want this method to block awaiting completion of old tasks in
// the previously visible TPE
oldTPE.awaitTermination();
}
这可能会导致上游问题,调用者需要知道如何处理null
.
您也可以用一个简单地拒绝每个新执行的虚拟 TPE 进行交换,但这相当于调用shutdown()
TPE 时发生的情况。
如果您不想使用shutdown
,请遵循以下方法:
遍历Future
从提交开始的所有任务,并按照建议检查对象ExecutorService
阻塞调用的get()
状态Future
Tim Bender
使用其中之一
ExecutorService
Executors
从 java 8 开始)invokeAll()
on executor service 也达到了同样的目的CountDownLatch
相关的 SE 问题:
您可以在Runner类上调用waitTillDone() :
Runner runner = Runner.runner(10);
runner.runIn(2, SECONDS, runnable);
runner.run(runnable); // each of this runnables could submit more tasks
runner.waitTillDone(); // blocks until all tasks are finished (or failed)
// and now reuse it
runner.runIn(500, MILLISECONDS, callable);
runner.waitTillDone();
runner.shutdown();
要使用它,请将此 gradle/maven 依赖项添加到您的项目中:'com.github.matejtymes:javafixes:1.0'
有关更多详细信息,请查看此处:https ://github.com/MatejTymes/JavaFixes或此处: http: //matejtymes.blogspot.com/2016/04/executor-that-notifies-you-when-task.html
尝试使用队列大小和活动任务计数,如下所示
while (executor.getThreadPoolExecutor().getActiveCount() != 0 || !executor.getThreadPoolExecutor().getQueue().isEmpty()){
try {
Thread.sleep(500);
} catch (InterruptedException e) {
}
}