33

我的问题:如何在 a 上执行一堆线程对象ThreadPoolExecutor并等待它们全部完成后再继续?

我是 ThreadPoolExecutor 的新手。所以这段代码是一个测试来了解它是如何工作的。现在我什至没有BlockingQueue用对象填充 ,因为我不明白如何在不调用execute()另一个RunnableObject. 无论如何,现在我只是打电话awaitTermination(),但我想我仍然错过了一些东西。任何提示都会很棒!谢谢。

public void testThreadPoolExecutor() throws InterruptedException {
  int limit = 20;
  BlockingQueue q = new ArrayBlockingQueue(limit);
  ThreadPoolExecutor ex = new ThreadPoolExecutor(limit, limit, 20, TimeUnit.SECONDS, q);
  for (int i = 0; i < limit; i++) {
    ex.execute(new RunnableObject(i + 1));
  }
  ex.awaitTermination(2, TimeUnit.SECONDS);
  System.out.println("finished");
}

RunnableObject 类:

package playground;

public class RunnableObject implements Runnable {

  private final int id;

  public RunnableObject(int id) {
    this.id = id;
  }

  @Override
  public void run() {
    System.out.println("ID: " + id + " started");
    try {
      Thread.sleep(2354);
    } catch (InterruptedException ignore) {
    }
    System.out.println("ID: " + id + " ended");
  }
}
4

6 回答 6

54

你应该循环awaitTermination

ExecutorService threads;
// ...
// Tell threads to finish off.
threads.shutdown();
// Wait for everything to finish.
while (!threads.awaitTermination(10, TimeUnit.SECONDS)) {
  log.info("Awaiting completion of threads.");
}
于 2012-06-07T16:15:19.567 回答
8

您的问题似乎是shutdown在您将所有作业提交到您的池后您没有打电话。没有shutdown()你的awaitTermination将永远返回错误。

ThreadPoolExecutor ex =
    new ThreadPoolExecutor(limit, limit, 20, TimeUnit.SECONDS, q);
for (int i = 0; i < limit; i++) {
  ex.execute(new RunnableObject(i + 1));
}
// you are missing this line!!
ex.shutdown();
ex.awaitTermination(2, TimeUnit.SECONDS);

您还可以执行以下操作来等待所有作业完成:

List<Future<Object>> futures = new ArrayList<Future<Object>>();
for (int i = 0; i < limit; i++) {
  futures.add(ex.submit(new RunnableObject(i + 1), (Object)null));
}
for (Future<Object> future : futures) {
   // this joins with the submitted job
   future.get();
}
...
// still need to shutdown at the end
ex.shutdown();

此外,因为您正在休眠2354几毫秒但只等待所有作业终止,所以2 SECONDSawaitTermination始终返回false。我会用Long.MAX_VALUE等待工作完成。

最后,听起来您正在担心创建一个新的ThreadPoolExecutor,而您想重用第一个。不要这样。与您为检测作业是否完成而编写的任何代码相比,GC 开销将非常小。


引用javadocs ThreadPoolExecutor.shutdown(),:

启动有序关闭,其中执行先前提交的任务,但不会接受新任务。如果已经关闭,调用没有额外的效果。

在该ThreadPoolExecutor.awaitTermination(...)方法中,它正在等待执行者的状态进入TERMINATED。但首先状态必须转到SHUTDOWNifshutdown()被调用或STOPifshutdownNow()被调用。

于 2012-06-07T15:05:07.873 回答
3

这与执行者本身无关。只需使用接口的java.util.concurrent.ExecutorService.invokeAll(Collection<? extends Callable<T>>). 它将阻塞,直到所有Callables 完成。

执行者注定要长寿;超出一组任务的生命周期。shutdown用于应用程序完成和清理时。

于 2012-06-07T15:13:15.853 回答
2

这是接受的答案的变体,如果/当抛出 InterruptedException 时处理重试:

executor.shutdown();

boolean isWait = true;

while (isWait)
{
    try
    {             
        isWait = !executor.awaitTermination(10, TimeUnit.SECONDS);
        if (isWait)
        {
            log.info("Awaiting completion of bulk callback threads.");
        }
    } catch (InterruptedException e) {
        log.debug("Interruped while awaiting completion of callback threads - trying again...");
    }
}
于 2015-02-08T06:38:55.167 回答
1

另一种方法是使用 CompletionService,如果您必须尝试任何任务结果,这非常有用:

//run 3 task at time
final int numParallelThreads = 3;

//I used newFixedThreadPool for convenience but if you need you can use ThreadPoolExecutor
ExecutorService executor = Executors.newFixedThreadPool(numParallelThreads);
CompletionService<String> completionService = new ExecutorCompletionService<String>(executor);

int numTaskToStart = 15;

for(int i=0; i<numTaskToStart ; i++){
    //task class that implements Callable<String> (or something you need)
    MyTask mt = new MyTask();

    completionService.submit(mt);
}

executor.shutdown(); //it cannot be queued more task

try {
    for (int t = 0; t < numTaskToStart ; t++) {
        Future<String> f = completionService.take();
        String result = f.get();
        // ... something to do ...
    }
} catch (InterruptedException e) {
    //termination of all started tasks (it returns all not started tasks in queue)
    executor.shutdownNow();
} catch (ExecutionException e) {
    // ... something to catch ...
}
于 2014-06-15T10:09:35.597 回答
-1

试试这个,

ThreadPoolExecutor ex =
    new ThreadPoolExecutor(limit, limit, 20, TimeUnit.SECONDS, q);
for (int i = 0; i < limit; i++) {
  ex.execute(new RunnableObject(i + 1));
}

要添加的行

ex.shutdown();
ex.awaitTermination(timeout, unit)
于 2012-06-07T15:15:37.853 回答