428

我需要一次执行一些任务 4,如下所示:

ExecutorService taskExecutor = Executors.newFixedThreadPool(4);
while(...) {
    taskExecutor.execute(new MyTask());
}
//...wait for completion somehow

全部完成后如何获得通知?现在我想不出比设置一些全局任务计数器并在每个任务结束时减少它更好的方法,然后在无限循环中监视这个计数器变为 0;或获取 Futures 列表并在无限循环中为所有这些监视器 isDone。什么是不涉及无限循环的更好的解决方案?

谢谢。

4

27 回答 27

494

基本上ExecutorService你打电话shutdown(),然后awaitTermination()

ExecutorService taskExecutor = Executors.newFixedThreadPool(4);
while(...) {
  taskExecutor.execute(new MyTask());
}
taskExecutor.shutdown();
try {
  taskExecutor.awaitTermination(Long.MAX_VALUE, TimeUnit.NANOSECONDS);
} catch (InterruptedException e) {
  ...
}
于 2009-08-09T04:44:49.593 回答
192

使用CountDownLatch

CountDownLatch latch = new CountDownLatch(totalNumberOfTasks);
ExecutorService taskExecutor = Executors.newFixedThreadPool(4);
while(...) {
  taskExecutor.execute(new MyTask());
}

try {
  latch.await();
} catch (InterruptedException E) {
   // handle
}

并在您的任务中(包含在 try / finally 中)

latch.countDown();
于 2009-08-09T04:46:20.573 回答
93

ExecutorService.invokeAll()为你做。

ExecutorService taskExecutor = Executors.newFixedThreadPool(4);
List<Callable<?>> tasks; // your tasks
// invokeAll() returns when all tasks are complete
List<Future<?>> futures = taskExecutor.invokeAll(tasks);
于 2009-08-09T04:52:19.127 回答
50

您也可以使用期货清单:

List<Future> futures = new ArrayList<Future>();
// now add to it:
futures.add(executorInstance.submit(new Callable<Void>() {
  public Void call() throws IOException {
     // do something
    return null;
  }
}));

然后当你想加入所有这些时,它基本上相当于加入每个,(额外的好处是它将异常从子线程重新引发到主线程):

for(Future f: this.futures) { f.get(); }

基本上,诀窍是一次在每个 Future 上调用 .get(),而不是在(全部或每个)上无限循环调用 isDone()。因此,一旦最后一个线程完成,您就可以保证“继续”通过并越过这个块。需要注意的是,由于 .get() 调用重新引发异常,如果其中一个线程死亡,您可能会在其他线程完成完成之前从这里引发[为避免这种情况,您可以catch ExecutionException在 get 调用周围添加一个]。另一个警告是它保留对所有线程的引用,因此如果它们具有线程局部变量,直到您通过此块之后才会收集它们(尽管您可以通过删除来解决这个问题,如果它成为一个问题Future 不在 ArrayList 中)。如果你想知道哪个 Future “先完成”https://stackoverflow.com/a/31885029/32453

于 2012-11-02T16:07:38.917 回答
47

在 Java8 中,您可以使用CompletableFuture来做到这一点:

ExecutorService es = Executors.newFixedThreadPool(4);
List<Runnable> tasks = getTasks();
CompletableFuture<?>[] futures = tasks.stream()
                               .map(task -> CompletableFuture.runAsync(task, es))
                               .toArray(CompletableFuture[]::new);
CompletableFuture.allOf(futures).join();    
es.shutdown();
于 2016-04-29T10:45:01.513 回答
28

只是我的两分钱。为了克服CountDownLatch事先知道任务数量的要求,您可以使用简单的Semaphore.

ExecutorService taskExecutor = Executors.newFixedThreadPool(4);
int numberOfTasks=0;
Semaphore s=new Semaphore(0);
while(...) {
    taskExecutor.execute(new MyTask());
    numberOfTasks++;
}

try {
    s.aquire(numberOfTasks);
...

在你的任务中,只要s.release()你愿意latch.countDown();

于 2012-01-19T10:48:28.830 回答
15

游戏有点晚了,但为了完成......

与其“等待”所有任务完成,不如按照好莱坞的原则来思考,“不要打电话给我,我会打电话给你”——当我完成时。我认为生成的代码更优雅......

Guava 提供了一些有趣的工具来实现这一点。

一个例子:

将 ExecutorService 包装到 ListeningExecutorService 中:

ListeningExecutorService service = MoreExecutors.listeningDecorator(Executors.newFixedThreadPool(10));

提交可调用的集合以执行 ::

for (Callable<Integer> callable : callables) {
  ListenableFuture<Integer> lf = service.submit(callable);
  // listenableFutures is a collection
  listenableFutures.add(lf)
});

现在是必不可少的部分:

ListenableFuture<List<Integer>> lf = Futures.successfulAsList(listenableFutures);

将回调附加到 ListenableFuture,您可以使用它在所有期货完成时收到通知:

Futures.addCallback(lf, new FutureCallback<List<Integer>> () {
    @Override
    public void onSuccess(List<Integer> result) {
        // do something with all the results
    }

    @Override
    public void onFailure(Throwable t) {
        // log failure
    }
});

这还提供了一个优势,即您可以在处理完成后将所有结果收集在一个地方......

更多信息在这里

于 2014-11-07T12:19:15.283 回答
12

Java 5 及更高版本中的CyclicBarrier类就是为这类事情而设计的。

于 2009-08-09T07:54:55.483 回答
10

这里有两个选项,只是有点混淆哪个最好。

选项1:

ExecutorService es = Executors.newFixedThreadPool(4);
List<Runnable> tasks = getTasks();
CompletableFuture<?>[] futures = tasks.stream()
                               .map(task -> CompletableFuture.runAsync(task, es))
                               .toArray(CompletableFuture[]::new);
CompletableFuture.allOf(futures).join();    
es.shutdown();

选项 2:

ExecutorService es = Executors.newFixedThreadPool(4);
List< Future<?>> futures = new ArrayList<>();
for(Runnable task : taskList) {
    futures.add(es.submit(task));
}

for(Future<?> future : futures) {
    try {
        future.get();
    }catch(Exception e){
        // do logging and nothing else
    }
}
es.shutdown();

在这里放 future.get(); 在 try catch 中是个好主意,对吗?

于 2018-09-02T16:14:15.960 回答
8

遵循以下方法之一。

  1. 遍历所有从on返回的Future任务,并按照建议通过阻塞调用对象检查状态submitExecutorServiceget()FutureKiran
  2. invokeAll()ExecutorService上使用
  3. 倒计时锁存器
  4. ForkJoinPoolExecutors.html#newWorkStealingPool
  5. shutdown, awaitTermination, shutdownNow以正确的顺序使用ThreadPoolExecutor 的 API

相关的 SE 问题:

Java多线程中如何使用CountDownLatch?

如何正确关闭 java ExecutorService

于 2016-04-19T12:15:13.133 回答
5

您可以将您的任务包装在另一个可运行的文件中,这将发送通知:

taskExecutor.execute(new Runnable() {
  public void run() {
    taskStartedNotification();
    new MyTask().run();
    taskFinishedNotification();
  }
});
于 2009-08-09T04:46:45.753 回答
3

我刚刚编写了一个示例程序来解决您的问题。没有给出简洁的实现,所以我会添加一个。虽然您可以使用executor.shutdown()and executor.awaitTermination(),但这并不是最佳实践,因为不同线程所花费的时间是不可预测的。

ExecutorService es = Executors.newCachedThreadPool();
    List<Callable<Integer>> tasks = new ArrayList<>();

    for (int j = 1; j <= 10; j++) {
        tasks.add(new Callable<Integer>() {

            @Override
            public Integer call() throws Exception {
                int sum = 0;
                System.out.println("Starting Thread "
                        + Thread.currentThread().getId());

                for (int i = 0; i < 1000000; i++) {
                    sum += i;
                }

                System.out.println("Stopping Thread "
                        + Thread.currentThread().getId());
                return sum;
            }

        });
    }

    try {
        List<Future<Integer>> futures = es.invokeAll(tasks);
        int flag = 0;

        for (Future<Integer> f : futures) {
            Integer res = f.get();
            System.out.println("Sum: " + res);
            if (!f.isDone()) 
                flag = 1;
        }

        if (flag == 0)
            System.out.println("SUCCESS");
        else
            System.out.println("FAILED");

    } catch (InterruptedException | ExecutionException e) {
        e.printStackTrace();
    }
于 2012-12-30T19:15:36.497 回答
3

只是为了在这里提供更多不同的选择来使用闩锁/屏障。您还可以获得部分结果,直到所有结果都使用CompletionService 完成

来自 Java Concurrency in practice:“如果您有一批计算要提交给 Executor,并且您希望在它们可用时检索它们的结果,您可以保留与每个任务关联的 Future 并通过调用 get 来重复轮询完成超时为零。这是可能的,但很乏味。幸运的是有一个更好的方法:完成服务。

这里的实现

public class TaskSubmiter {
    private final ExecutorService executor;
    TaskSubmiter(ExecutorService executor) { this.executor = executor; }
    void doSomethingLarge(AnySourceClass source) {
        final List<InterestedResult> info = doPartialAsyncProcess(source);
        CompletionService<PartialResult> completionService = new ExecutorCompletionService<PartialResult>(executor);
        for (final InterestedResult interestedResultItem : info)
            completionService.submit(new Callable<PartialResult>() {
                public PartialResult call() {
                    return InterestedResult.doAnOperationToGetPartialResult();
                }
        });

    try {
        for (int t = 0, n = info.size(); t < n; t++) {
            Future<PartialResult> f = completionService.take();
            PartialResult PartialResult = f.get();
            processThisSegment(PartialResult);
            }
        } 
        catch (InterruptedException e) {
            Thread.currentThread().interrupt();
        } 
        catch (ExecutionException e) {
            throw somethinghrowable(e.getCause());
        }
    }
}
于 2016-03-04T17:51:40.230 回答
3

这是我的解决方案,基于“AdamSkywalker”提示,它有效

package frss.main;

import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

public class TestHilos {

    void procesar() {
        ExecutorService es = Executors.newFixedThreadPool(4);
        List<Runnable> tasks = getTasks();
        CompletableFuture<?>[] futures = tasks.stream().map(task -> CompletableFuture.runAsync(task, es)).toArray(CompletableFuture[]::new);
        CompletableFuture.allOf(futures).join();
        es.shutdown();

        System.out.println("FIN DEL PROCESO DE HILOS");
    }

    private List<Runnable> getTasks() {
        List<Runnable> tasks = new ArrayList<Runnable>();

        Hilo01 task1 = new Hilo01();
        tasks.add(task1);

        Hilo02 task2 = new Hilo02();
        tasks.add(task2);
        return tasks;
    }

    private class Hilo01 extends Thread {

        @Override
        public void run() {
            System.out.println("HILO 1");
        }

    }

    private class Hilo02 extends Thread {

        @Override
        public void run() {
            try {
                sleep(2000);
            }
            catch (InterruptedException e) {
                e.printStackTrace();
            }
            System.out.println("HILO 2");
        }

    }


    public static void main(String[] args) {
        TestHilos test = new TestHilos();
        test.procesar();
    }
}
于 2018-05-23T15:26:45.347 回答
3

使用 ExecutorService 的清洁方式

 List<Future<Void>> results = null;
 try {
     List<Callable<Void>> tasks = new ArrayList<>();
     ExecutorService executorService = Executors.newFixedThreadPool(4);
     results = executorService.invokeAll(tasks);
 } catch (InterruptedException ex) {
     ...
 } catch (Exception ex) {
     ...
 }
于 2020-09-09T21:00:01.843 回答
2

您可以使用以下代码:

public class MyTask implements Runnable {

    private CountDownLatch countDownLatch;

    public MyTask(CountDownLatch countDownLatch {
         this.countDownLatch = countDownLatch;
    }

    @Override
    public void run() {
         try {
             //Do somethings
             //
             this.countDownLatch.countDown();//important
         } catch (InterruptedException ex) {
              Thread.currentThread().interrupt();
         }
     }
}

CountDownLatch countDownLatch = new CountDownLatch(NUMBER_OF_TASKS);
ExecutorService taskExecutor = Executors.newFixedThreadPool(4);
for (int i = 0; i < NUMBER_OF_TASKS; i++){
     taskExecutor.execute(new MyTask(countDownLatch));
}
countDownLatch.await();
System.out.println("Finish tasks");
于 2017-05-23T06:15:40.980 回答
2

所以我在这里发布链接问题的答案,以防有人想要更简单的方法来做到这一点

ExecutorService executor = Executors.newFixedThreadPool(10);
CompletableFuture[] futures = new CompletableFuture[10];
int i = 0;
while (...) {
    futures[i++] =  CompletableFuture.runAsync(runner, executor);
}

CompletableFuture.allOf(futures).join(); // THis will wait until all future ready.
于 2018-05-07T18:49:09.237 回答
2

我创建了以下工作示例。这个想法是有一种方法来处理具有许多线程(由 numberOfTasks/threshold 以编程方式确定)的任务池(我以队列为例),并等到所有线程完成以继续进行一些其他处理。

import java.util.PriorityQueue;
import java.util.Queue;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

/** Testing CountDownLatch and ExecutorService to manage scenario where
 * multiple Threads work together to complete tasks from a single
 * resource provider, so the processing can be faster. */
public class ThreadCountDown {

private CountDownLatch threadsCountdown = null;
private static Queue<Integer> tasks = new PriorityQueue<>();

public static void main(String[] args) {
    // Create a queue with "Tasks"
    int numberOfTasks = 2000;
    while(numberOfTasks-- > 0) {
        tasks.add(numberOfTasks);
    }

    // Initiate Processing of Tasks
    ThreadCountDown main = new ThreadCountDown();
    main.process(tasks);
}

/* Receiving the Tasks to process, and creating multiple Threads
* to process in parallel. */
private void process(Queue<Integer> tasks) {
    int numberOfThreads = getNumberOfThreadsRequired(tasks.size());
    threadsCountdown = new CountDownLatch(numberOfThreads);
    ExecutorService threadExecutor = Executors.newFixedThreadPool(numberOfThreads);

    //Initialize each Thread
    while(numberOfThreads-- > 0) {
        System.out.println("Initializing Thread: "+numberOfThreads);
        threadExecutor.execute(new MyThread("Thread "+numberOfThreads));
    }

    try {
        //Shutdown the Executor, so it cannot receive more Threads.
        threadExecutor.shutdown();
        threadsCountdown.await();
        System.out.println("ALL THREADS COMPLETED!");
        //continue With Some Other Process Here
    } catch (InterruptedException ex) {
        ex.printStackTrace();
    }
}

/* Determine the number of Threads to create */
private int getNumberOfThreadsRequired(int size) {
    int threshold = 100;
    int threads = size / threshold;
    if( size > (threads*threshold) ){
        threads++;
    }
    return threads;
}

/* Task Provider. All Threads will get their task from here */
private synchronized static Integer getTask(){
    return tasks.poll();
}

/* The Threads will get Tasks and process them, while still available.
* When no more tasks available, the thread will complete and reduce the threadsCountdown */
private class MyThread implements Runnable {

    private String threadName;

    protected MyThread(String threadName) {
        super();
        this.threadName = threadName;
    }

    @Override
    public void run() {
        Integer task;
        try{
            //Check in the Task pool if anything pending to process
            while( (task = getTask()) != null ){
                processTask(task);
            }
        }catch (Exception ex){
            ex.printStackTrace();
        }finally {
            /*Reduce count when no more tasks to process. Eventually all
            Threads will end-up here, reducing the count to 0, allowing
            the flow to continue after threadsCountdown.await(); */
            threadsCountdown.countDown();
        }
    }

    private void processTask(Integer task){
        try{
            System.out.println(this.threadName+" is Working on Task: "+ task);
        }catch (Exception ex){
            ex.printStackTrace();
        }
    }
}
}

希望能帮助到你!

于 2019-01-18T21:35:40.147 回答
1

您可以使用自己的ExecutorCompletionService子类进行包装taskExecutor,并使用自己的BlockingQueue实现在每个任务完成时获得通知,并在完成的任务数量达到您想要的目标时执行您想要的任何回调或其他操作。

于 2009-08-09T04:49:00.383 回答
1

你应该使用executorService.shutdown()executorService.awaitTermination方法。

一个例子如下:

public class ScheduledThreadPoolExample {

    public static void main(String[] args) throws InterruptedException {
        ScheduledExecutorService executorService = Executors.newScheduledThreadPool(5);
        executorService.scheduleAtFixedRate(() -> System.out.println("process task."),
                0, 1, TimeUnit.SECONDS);

        TimeUnit.SECONDS.sleep(10);
        executorService.shutdown();
        executorService.awaitTermination(1, TimeUnit.DAYS);
    }

}
于 2016-06-21T05:41:18.030 回答
0

Java 8 - 我们可以使用流 API 来处理流。请看下面的片段

final List<Runnable> tasks = ...; //or any other functional interface
tasks.stream().parallel().forEach(Runnable::run) // Uses default pool

//alternatively to specify parallelism 
new ForkJoinPool(15).submit(
          () -> tasks.stream().parallel().forEach(Runnable::run) 
    ).get();
于 2016-07-19T23:32:06.777 回答
0

ExecutorService WORKER_THREAD_POOL 
  = Executors.newFixedThreadPool(10);
CountDownLatch latch = new CountDownLatch(2);
for (int i = 0; i < 2; i++) {
    WORKER_THREAD_POOL.submit(() -> {
        try {
            // doSomething();
            latch.countDown();
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
        }
    });
}

// wait for the latch to be decremented by the two remaining threads
latch.await();

如果doSomething()抛出一些其他异常,latch.countDown()似乎不会执行,那我该怎么办?

于 2019-02-21T15:56:30.767 回答
0

如果您连续使用更多线程 ExecutionServices 并希望等待 EACH EXECUTIONSERVICE 完成。最好的方法如下所示;

ExecutorService executer1 = Executors.newFixedThreadPool(THREAD_SIZE1);
for (<loop>) {
   executer1.execute(new Runnable() {
            @Override
            public void run() {
                ...
            }
        });
} 
executer1.shutdown();

try{
   executer1.awaitTermination(Long.MAX_VALUE, TimeUnit.NANOSECONDS);

   ExecutorService executer2 = Executors.newFixedThreadPool(THREAD_SIZE2);
   for (true) {
      executer2.execute(new Runnable() {
            @Override
            public void run() {
                 ...
            }
        });
   } 
   executer2.shutdown();
} catch (Exception e){
 ...
}
于 2019-09-05T21:59:48.447 回答
0

使用Project LoomAutoCloseable在执行程序服务上尝试资源语法

Project Loom旨在为 Java 的并发能力添加新特性。

其中一项功能是制作. 这意味着每个实现都会提供一个方法。这意味着我们可以使用try-with-resources语法来自动关闭一个对象。ExecutorService AutoCloseableExecutorServicecloseExecutorService

ExecutorService#close方法会阻塞,直到所有提交的任务都完成。usingclose代替了调用shutdown& awaitTermination

存在AutoCloseable有助于 Project Loom 将“结构化并发”</a> 引入 Java 的尝试。

try (
    ExecutorService executorService = Executors.… ;
) {
    // Submit your `Runnable`/`Callable` tasks to the executor service.
    …
}
// At this point, flow-of-control blocks until all submitted tasks are done/canceled/failed.
// After this point, the executor service will have been automatically shutdown, wia `close` method called by try-with-resources syntax.

有关 Project Loom 的更多信息,请搜索 Ron Pressler 和 Project Loom 团队的其他人提供的谈话和采访。随着 Project Loom 的发展,关注最近的事情。

基于早期访问Java 18的 Project Loom 技术的实验版本现已推出

于 2021-06-30T08:13:23.373 回答
-1

这可能会有所帮助

Log.i(LOG_TAG, "shutting down executor...");
executor.shutdown();
while (true) {
                try {
                    Log.i(LOG_TAG, "Waiting for executor to terminate...");
                    if (executor.isTerminated())
                        break;
                    if (executor.awaitTermination(5000, TimeUnit.MILLISECONDS)) {
                        break;
                    }
                } catch (InterruptedException ignored) {}
            }
于 2015-10-22T05:57:56.277 回答
-1

你可以在这个Runner类上调用waitTillDone() :

Runner runner = Runner.runner(4); // create pool with 4 threads in thread pool

while(...) {
    runner.run(new MyTask()); // here you submit your task
}


runner.waitTillDone(); // and this blocks until all tasks are finished (or failed)


runner.shutdown(); // once you done you can shutdown the runner

在调用shutdown() 之前,您可以重用这个类并多次调用waitTillDone(),而且您的代码非常简单。此外,您不必预先知道任务的数量。

要使用它,只需将此 gradle/mavencompile 'com.github.matejtymes:javafixes:1.3.1'依赖项添加到您的项目中。

更多详情可在这找到:

https://github.com/MatejTymes/JavaFixes

于 2016-04-29T01:29:21.863 回答
-2

executor 中有一个方法getActiveCount()- 给出活动线程的计数。

跨越线程后,我们可以检查该activeCount()值是否为0. 一旦该值为零,则意味着当前没有正在运行的活动线程,这意味着任务已完成:

while (true) {
    if (executor.getActiveCount() == 0) {
    //ur own piece of code
    break;
    }
}
于 2014-10-31T09:28:49.267 回答