229

等待所有任务ExecutorService完成的最简单方法是什么?我的任务主要是计算,所以我只想运行大量的作业——每个核心上一个。现在我的设置如下所示:

ExecutorService es = Executors.newFixedThreadPool(2);
for (DataTable singleTable : uniquePhrases) {   
    es.execute(new ComputeDTask(singleTable));
}
try{
    es.wait();
} 
catch (InterruptedException e){
    e.printStackTrace();
}

ComputeDTask实现可运行。这似乎可以正确执行任务,但代码wait()IllegalMonitorStateException. 这很奇怪,因为我玩了一些玩具示例,它似乎可以工作。

uniquePhrases包含数以万计的元素。我应该使用其他方法吗?我正在寻找尽可能简单的东西

4

16 回答 16

240

最简单的方法是使用ExecutorService.invokeAll()which 在单行中执行您想要的操作。用您的话说,您需要修改或包装ComputeDTask来实现Callable<>,这可以为您提供更多的灵活性。可能在您的应用程序中有一个有意义的实现Callable.call(),但如果不使用,这里有一种包装它的方法Executors.callable()

ExecutorService es = Executors.newFixedThreadPool(2);
List<Callable<Object>> todo = new ArrayList<Callable<Object>>(singleTable.size());

for (DataTable singleTable: uniquePhrases) { 
    todo.add(Executors.callable(new ComputeDTask(singleTable))); 
}

List<Future<Object>> answers = es.invokeAll(todo);

正如其他人指出的那样,invokeAll()如果合适,您可以使用超时版本。在此示例中,answers将包含一堆Future将返回空值的 s(Executors.callable()ComputeDTask参见不要从你的例子中看出。

如果不清楚,请注意invokeAll()在完成所有任务之前不会返回。(即,如果被询问,您集合Future中的所有 s都会报告。)这避免了所有手动关闭、awaitTermination 等......如果需要,您可以在多个周期内巧妙地重复使用它。answers.isDone()ExecutorService

关于 SO 有几个相关的问题:

对于您的问题,这些都不是严格的,但它们确实为人们认为Executor/ExecutorService应该如何使用提供了一些色彩。

于 2010-07-17T01:45:45.620 回答
66

如果要等待所有任务完成,请使用shutdown方法而不是wait. 然后跟随它awaitTermination

此外,您可以使用Runtime.availableProcessors来获取硬件线程的数量,以便正确初始化线程池。

于 2010-07-16T23:16:01.463 回答
49

如果等待所有任务ExecutorService完成并不是您的目标,而是等待特定批次的任务完成,您可以使用 a CompletionService— 具体来说,一个ExecutorCompletionService.

这个想法是创建一个ExecutorCompletionService包装Executor,通过提交一些已知数量的任务,然后使用(哪个块)或(哪个不)从完成队列CompletionService中绘制相同数量的结果。一旦您绘制了与您提交的任务相对应的所有预期结果,您就知道它们都已完成。take()poll()

让我再说一遍,因为从界面上看并不明显:您必须知道您放入了CompletionService多少东西才能知道要尝试提取多少东西。这对于该方法尤其重要take():多次调用它,它会阻塞您的调用线程,直到某个其他线程将另一个作业提交给相同的CompletionService.

Java Concurrency in Practice一书中有一些例子展示了如何使用CompletionService

于 2010-07-16T23:53:50.527 回答
13

如果您想等待执行器服务完成执行,请调用shutdown(),然后调用awaitTermination(units, unitType),例如awaitTermination(1, MINUTE)。ExecutorService 不会在它自己的监视器上阻塞,所以你不能使用wait等。

于 2010-07-16T23:16:40.883 回答
8

您可以在一定的时间间隔内等待作业完成:

int maxSecondsPerComputeDTask = 20;
try {
    while (!es.awaitTermination(uniquePhrases.size() * maxSecondsPerComputeDTask, TimeUnit.SECONDS)) {
        // consider giving up with a 'break' statement under certain conditions
    }
} catch (InterruptedException e) {
    throw new RuntimeException(e);    
}

或者你可以使用ExecutorService提交Runnable)并收集它返回的Future对象并依次调用get()以等待它们完成。

ExecutorService es = Executors.newFixedThreadPool(2);
Collection<Future<?>> futures = new LinkedList<<Future<?>>();
for (DataTable singleTable : uniquePhrases) {
    futures.add(es.submit(new ComputeDTask(singleTable)));
}
for (Future<?> future : futures) {
   try {
       future.get();
   } catch (InterruptedException e) {
       throw new RuntimeException(e);
   } catch (ExecutionException e) {
       throw new RuntimeException(e);
   }
}

正确处理InterruptedException非常重要。它可以让您或您的图书馆的用户安全地终止一个漫长的过程。

于 2010-07-17T00:33:33.883 回答
7

只需使用

latch = new CountDownLatch(noThreads)

在每个线程

latch.countDown();

并作为屏障

latch.await();
于 2015-01-22T10:20:55.200 回答
6

IllegalMonitorStateException的根本原因:

抛出以指示线程已尝试在对象的监视器上等待,或通知其他线程在对象的监视器上等待而不拥有指定的监视器。

从您的代码中,您刚刚在 ExecutorService 上调用了 wait() 而没有拥有锁。

下面的代码将修复IllegalMonitorStateException

try 
{
    synchronized(es){
        es.wait(); // Add some condition before you call wait()
    }
} 

按照以下方法之一等待已提交的所有任务完成ExecutorService

  1. 从on 开始遍历所有Future任务并使用阻塞调用对象检查状态submitExecutorServiceget()Future

  2. 使用invokeAll _ExecutorService

  3. 使用CountDownLatch

  4. 使用ForkJoinPoolnewWorkStealingPoolExecutors从 java 8 开始)

  5. 按照 oracle 文档页面中的建议关闭池

    void shutdownAndAwaitTermination(ExecutorService pool) {
       pool.shutdown(); // Disable new tasks from being submitted
       try {
       // Wait a while for existing tasks to terminate
       if (!pool.awaitTermination(60, TimeUnit.SECONDS)) {
           pool.shutdownNow(); // Cancel currently executing tasks
           // Wait a while for tasks to respond to being cancelled
           if (!pool.awaitTermination(60, TimeUnit.SECONDS))
           System.err.println("Pool did not terminate");
       }
    } catch (InterruptedException ie) {
         // (Re-)Cancel if current thread also interrupted
         pool.shutdownNow();
         // Preserve interrupt status
         Thread.currentThread().interrupt();
    }
    

    如果您想在使用选项 5 而不是选项 1 到 4 时优雅地等待所有任务完成,请更改

    if (!pool.awaitTermination(60, TimeUnit.SECONDS)) {
    

    awhile(condition)每 1 分钟检查一次。

于 2016-04-18T16:09:02.790 回答
6

您可以使用ExecutorService.invokeAll方法,它将执行所有任务并等待所有线程完成任务。

这是完整的javadoc

您还可以使用此方法的重载版本来指定超时。

这是示例代码ExecutorService.invokeAll

public class Test {
    public static void main(String[] args) throws InterruptedException, ExecutionException {
        ExecutorService service = Executors.newFixedThreadPool(3);
        List<Callable<String>> taskList = new ArrayList<>();
        taskList.add(new Task1());
        taskList.add(new Task2());
        List<Future<String>> results = service.invokeAll(taskList);
        for (Future<String> f : results) {
            System.out.println(f.get());
        }
    }

}

class Task1 implements Callable<String> {
    @Override
    public String call() throws Exception {
        try {
            Thread.sleep(2000);
            return "Task 1 done";
        } catch (Exception e) {
            e.printStackTrace();
            return " error in task1";
        }
    }
}

class Task2 implements Callable<String> {
    @Override
    public String call() throws Exception {
        try {
            Thread.sleep(3000);
            return "Task 2 done";
        } catch (Exception e) {
            e.printStackTrace();
            return " error in task2";
        }
    }
}
于 2018-04-10T05:42:04.677 回答
3

我也有需要抓取一组文档的情况。我从一个应该处理的初始“种子”文档开始,该文档包含指向也应该处理的其他文档的链接,依此类推。

在我的主程序中,我只想编写如下内容,其中Crawler控制一堆线程。

Crawler c = new Crawler();
c.schedule(seedDocument); 
c.waitUntilCompletion()

如果我想导航一棵树,也会发生同样的情况。我会弹出根节点,每个节点的处理器会根据需要将子节点添加到队列中,并且一堆线程会处理树中的所有节点,直到没有更多节点为止。

我在 JVM 中找不到任何我认为有点令人惊讶的东西。所以我写了一个类ThreadPool,它可以直接使用或子类来添加适合域的方法,例如schedule(Document). 希望能帮助到你!

线程池 Javadoc | 马文

于 2010-10-14T17:55:51.513 回答
2

添加集合中的所有线程并使用invokeAll. 如果您可以使用invokeAll的方法ExecutorService,则 JVM 在所有线程都完成之前不会继续执行下一行。

这里有一个很好的例子: invokeAll via ExecutorService

于 2014-06-06T11:23:04.330 回答
1

将您的任务提交到Runner中,然后等待调用方法waitTillDone(),如下所示:

Runner runner = Runner.runner(2);

for (DataTable singleTable : uniquePhrases) {

    runner.run(new ComputeDTask(singleTable));
}

// blocks until all tasks are finished (or failed)
runner.waitTillDone();

runner.shutdown();

要使用它,请添加此 gradle/maven 依赖项:'com.github.matejtymes:javafixes:1.0'

有关更多详细信息,请查看此处:https ://github.com/MatejTymes/JavaFixes或此处: http: //matejtymes.blogspot.com/2016/04/executor-that-notifies-you-when-task.html

于 2016-04-29T10:18:24.117 回答
0

我将等待执行程序以您认为适合完成任务的指定超时终止。

 try {  
         //do stuff here 
         exe.execute(thread);
    } finally {
        exe.shutdown();
    }
    boolean result = exe.awaitTermination(4, TimeUnit.HOURS);
    if (!result)

    {
        LOGGER.error("It took more than 4 hour for the executor to stop, this shouldn't be the normal behaviour.");
    }
于 2014-02-10T12:54:40.363 回答
0

听起来您需要ForkJoinPool并使用全局池来执行任务。

public static void main(String[] args) {
    // the default `commonPool` should be sufficient for many cases.
    ForkJoinPool pool = ForkJoinPool.commonPool(); 
    // The root of your task that may spawn other tasks. 
    // Make sure it submits the additional tasks to the same executor that it is in.
    Runnable rootTask = new YourTask(pool); 
    pool.execute(rootTask);
    pool.awaitQuiescence(...);
    // that's it.
}

美妙之处在于pool.awaitQuiescence该方法将阻止利用调用者的线程来执行其任务,然后在它真的为空时返回。

于 2015-04-19T04:06:59.743 回答
0

有几种方法。

您可以先调用ExecutorService.shutdown,然后调用ExecutorService.awaitTermination ,它会返回:

如果此执行程序终止,则为 true;如果在终止前超时,则为 false

所以:

有一个名为 awaitTermination 的函数,但必须在其中提供超时。这并不能保证当它返回时所有任务都会完成。有没有办法做到这一点?

你只需要awaitTermination循环调用。

使用 awaitTermination

此实现的完整示例:

public class WaitForAllToEnd {

    public static void main(String[] args) throws InterruptedException {
        final int total_threads = 4;
        ExecutorService executor = Executors.newFixedThreadPool(total_threads);
        for(int i = 0; i < total_threads; i++){
            executor.execute(parallelWork(100 + i * 100));
        }

        int count = 0;

        // This is the relevant part
        // Chose the delay most appropriate for your use case
        executor.shutdown();
        while (!executor.awaitTermination(100, TimeUnit.MILLISECONDS)) {
            System.out.println("Waiting "+ count);
            count++;
        }
    }

    private static Runnable parallelWork(long sleepMillis) {
        return () -> {
            try {
                Thread.sleep(sleepMillis);
            } catch (InterruptedException e) {
                // Do Something
            }
            System.out.println("I am Thread : " + Thread.currentThread().getId());
        };
    }
}

使用 CountDownLatch

另一种选择是创建一个等于并行任务数量的CountDownLatch 。count每个线程调用countDownLatch.countDown();,而线程调用countDownLatch.await();

此实现的完整示例:

public class WaitForAllToEnd {

    public static void main(String[] args) throws InterruptedException {
        final int total_threads = 4;
        CountDownLatch countDownLatch = new CountDownLatch(total_threads);
        ExecutorService executor = Executors.newFixedThreadPool(total_threads);
        for(int i = 0; i < total_threads; i++){
            executor.execute(parallelWork(100 + i * 100, countDownLatch));
        }
        countDownLatch.await();
        System.out.println("Exit");
        executor.shutdown();
    }

    private static Runnable parallelWork(long sleepMillis, CountDownLatch countDownLatch) {
        return () -> {
            try {
                Thread.sleep(sleepMillis);
            } catch (InterruptedException e) {
                // Do Something
            }
            System.out.println("I am Thread : " + Thread.currentThread().getId());
            countDownLatch.countDown();
        };
    }
}

使用循环障碍

另一种方法是使用循环障碍

public class WaitForAllToEnd {

    public static void main(String[] args) throws InterruptedException, BrokenBarrierException {
        final int total_threads = 4;
        CyclicBarrier barrier = new CyclicBarrier(total_threads+ 1);
        ExecutorService executor = Executors.newFixedThreadPool(total_threads);
        for(int i = 0; i < total_threads; i++){
            executor.execute(parallelWork(100 + i * 100, barrier));
        }
        barrier.await();
        System.out.println("Exit");
        executor.shutdown();
    }

    private static Runnable parallelWork(long sleepMillis, CyclicBarrier barrier) {
        return () -> {
            try {
                Thread.sleep(sleepMillis);
            } catch (InterruptedException e) {
                // Do Something
            }
            System.out.println("I am Thread : " + Thread.currentThread().getId());
            try {
                barrier.await();
            } catch (InterruptedException | BrokenBarrierException e) {
              // Do something
            }
        };
    }
}

还有其他方法,但这些方法需要更改您的初始要求,即:

使用 ExecutorService.execute() 提交所有任务时如何等待它们完成。

于 2022-02-22T09:38:34.860 回答
0

这个怎么样?

Object lock = new Object();
CountDownLatch cdl = new CountDownLatch(threadNum);
for (int i = 0; i < threadNum; i++) {
    executorService.execute(() -> {

        synchronized (lock) {
            cdl.countDown();
            try {
                lock.wait();
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
            }
        }
    });
}
cdl.await();
synchronized (lock) {
    lock.notifyAll();
}

如果您不向 ExecutorService 添加新任务,这可能会等待所有当前任务完成

于 2020-08-10T04:00:15.883 回答
-1

一个简单的替代方法是将线程与连接一起使用。参考:加入线程

于 2012-08-12T05:32:23.877 回答