118

有什么方法可以简单地等待所有线程进程完成?例如,假设我有:

public class DoSomethingInAThread implements Runnable{

    public static void main(String[] args) {
        for (int n=0; n<1000; n++) {
            Thread t = new Thread(new DoSomethingInAThread());
            t.start();
        }
        // wait for all threads' run() methods to complete before continuing
    }

    public void run() {
        // do something here
    }


}

如何更改此设置,以便该main()方法在注释处暂停,直到所有线程的run()方法退出?谢谢!

4

14 回答 14

165

你把所有线程放在一个数组中,启动它们,然后有一个循环

for(i = 0; i < threads.length; i++)
  threads[i].join();

Each join will block until the respective thread has completed. Threads may complete in a different order than you joining them, but that's not a problem: when the loop exits, all threads are completed.

于 2009-08-09T20:31:37.307 回答
42

One way would be to make a List of Threads, create and launch each thread, while adding it to the list. Once everything is launched, loop back through the list and call join() on each one. It doesn't matter what order the threads finish executing in, all you need to know is that by the time that second loop finishes executing, every thread will have completed.

A better approach is to use an ExecutorService and its associated methods:

List<Callable> callables = ... // assemble list of Callables here
                               // Like Runnable but can return a value
ExecutorService execSvc = Executors.newCachedThreadPool();
List<Future<?>> results = execSvc.invokeAll(callables);
// Note: You may not care about the return values, in which case don't
//       bother saving them

Using an ExecutorService (and all of the new stuff from Java 5's concurrency utilities) is incredibly flexible, and the above example barely even scratches the surface.

于 2009-08-09T20:31:55.013 回答
28
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;

public class DoSomethingInAThread implements Runnable
{
   public static void main(String[] args) throws ExecutionException, InterruptedException
   {
      //limit the number of actual threads
      int poolSize = 10;
      ExecutorService service = Executors.newFixedThreadPool(poolSize);
      List<Future<Runnable>> futures = new ArrayList<Future<Runnable>>();

      for (int n = 0; n < 1000; n++)
      {
         Future f = service.submit(new DoSomethingInAThread());
         futures.add(f);
      }

      // wait for all tasks to complete before continuing
      for (Future<Runnable> f : futures)
      {
         f.get();
      }

      //shut down the executor service so that this thread can exit
      service.shutdownNow();
   }

   public void run()
   {
      // do something here
   }
}
于 2009-08-10T00:25:01.303 回答
12

instead of join(), which is an old API, you can use CountDownLatch. I have modified your code as below to fulfil your requirement.

import java.util.concurrent.*;
class DoSomethingInAThread implements Runnable{
    CountDownLatch latch;
    public DoSomethingInAThread(CountDownLatch latch){
        this.latch = latch;
    } 
    public void run() {
        try{
            System.out.println("Do some thing");
            latch.countDown();
        }catch(Exception err){
            err.printStackTrace();
        }
    }
}

public class CountDownLatchDemo {
    public static void main(String[] args) {
        try{
            CountDownLatch latch = new CountDownLatch(1000);
            for (int n=0; n<1000; n++) {
                Thread t = new Thread(new DoSomethingInAThread(latch));
                t.start();
            }
            latch.await();
            System.out.println("In Main thread after completion of 1000 threads");
        }catch(Exception err){
            err.printStackTrace();
        }
    }
}

Explanation:

  1. CountDownLatch has been initialized with given count 1000 as per your requirement.

  2. Each worker thread DoSomethingInAThread will decrement the CountDownLatch, which has been passed in constructor.

  3. Main thread CountDownLatchDemo await() till the count has become zero. Once the count has become zero, you will get below line in output.

    In Main thread after completion of 1000 threads
    

More info from oracle documentation page

public void await()
           throws InterruptedException

Causes the current thread to wait until the latch has counted down to zero, unless the thread is interrupted.

Refer to related SE question for other options:

wait until all threads finish their work in java

于 2016-05-09T12:40:14.890 回答
8

Avoid the Thread class altogether and instead use the higher abstractions provided in java.util.concurrent

The ExecutorService class provides the method invokeAll that seems to do just what you want.

于 2009-08-09T20:52:03.023 回答
5

Consider using java.util.concurrent.CountDownLatch. Examples in javadocs

于 2014-11-13T15:14:22.283 回答
5

As Martin K suggested java.util.concurrent.CountDownLatch seems to be a better solution for this. Just adding an example for the same

     public class CountDownLatchDemo
{

    public static void main (String[] args)
    {
        int noOfThreads = 5;
        // Declare the count down latch based on the number of threads you need
        // to wait on
        final CountDownLatch executionCompleted = new CountDownLatch(noOfThreads);
        for (int i = 0; i < noOfThreads; i++)
        {
            new Thread()
            {

                @Override
                public void run ()
                {

                    System.out.println("I am executed by :" + Thread.currentThread().getName());
                    try
                    {
                        // Dummy sleep
                        Thread.sleep(3000);
                        // One thread has completed its job
                        executionCompleted.countDown();
                    }
                    catch (InterruptedException e)
                    {
                        // TODO Auto-generated catch block
                        e.printStackTrace();
                    }
                }

            }.start();
        }

        try
        {
            // Wait till the count down latch opens.In the given case till five
            // times countDown method is invoked
            executionCompleted.await();
            System.out.println("All over");
        }
        catch (InterruptedException e)
        {
            e.printStackTrace();
        }
    }

}
于 2015-03-04T05:58:50.907 回答
4

Depending on your needs, you may also want to check out the classes CountDownLatch and CyclicBarrier in the java.util.concurrent package. They can be useful if you want your threads to wait for each other, or if you want more fine-grained control over the way your threads execute (e.g., waiting in their internal execution for another thread to set some state). You could also use a CountDownLatch to signal all of your threads to start at the same time, instead of starting them one by one as you iterate through your loop. The standard API docs have an example of this, plus using another CountDownLatch to wait for all threads to complete their execution.

于 2009-08-09T21:50:53.030 回答
3

如果您制作线程列表,您可以遍历它们并针对每个线程使用 .join() ,当所有线程都拥有时,您的循环将完成。我还没有尝试过。

http://docs.oracle.com/javase/8/docs/api/java/lang/Thread.html#join()

于 2009-08-09T20:31:00.350 回答
1

Create the thread object inside the first for loop.

for (int i = 0; i < threads.length; i++) {
     threads[i] = new Thread(new Runnable() {
         public void run() {
             // some code to run in parallel
         }
     });
     threads[i].start();
 }

And then so what everyone here is saying.

for(i = 0; i < threads.length; i++)
  threads[i].join();
于 2014-09-17T07:58:51.870 回答
0

您可以使用对象“ThreadGroup”及其参数 activeCount来做到这一点:

于 2009-08-09T20:27:47.207 回答
0

As an alternative to CountDownLatch you can also use CyclicBarrier e.g.

public class ThreadWaitEx {
    static CyclicBarrier barrier = new CyclicBarrier(100, new Runnable(){
        public void run(){
            System.out.println("clean up job after all tasks are done.");
        }
    });
    public static void main(String[] args) {
        for (int i = 0; i < 100; i++) {
            Thread t = new Thread(new MyCallable(barrier));
            t.start();
        }       
    }

}    

class MyCallable implements Runnable{
    private CyclicBarrier b = null;
    public MyCallable(CyclicBarrier b){
        this.b = b;
    }
    @Override
    public void run(){
        try {
            //do something
            System.out.println(Thread.currentThread().getName()+" is waiting for barrier after completing his job.");
            b.await();
        } catch (InterruptedException e) {
            e.printStackTrace();
        } catch (BrokenBarrierException e) {
            e.printStackTrace();
        }
    }       
}

To use CyclicBarrier in this case barrier.await() should be the last statement i.e. when your thread is done with its job. CyclicBarrier can be used again with its reset() method. To quote javadocs:

A CyclicBarrier supports an optional Runnable command that is run once per barrier point, after the last thread in the party arrives, but before any threads are released. This barrier action is useful for updating shared-state before any of the parties continue.

于 2016-09-16T16:12:59.943 回答
0

The join() was not helpful to me. see this sample in Kotlin:

    val timeInMillis = System.currentTimeMillis()
    ThreadUtils.startNewThread(Runnable {
        for (i in 1..5) {
            val t = Thread(Runnable {
                Thread.sleep(50)
                var a = i
                kotlin.io.println(Thread.currentThread().name + "|" + "a=$a")
                Thread.sleep(200)
                for (j in 1..5) {
                    a *= j
                    Thread.sleep(100)
                    kotlin.io.println(Thread.currentThread().name + "|" + "$a*$j=$a")
                }
                kotlin.io.println(Thread.currentThread().name + "|TaskDurationInMillis = " + (System.currentTimeMillis() - timeInMillis))
            })
            t.start()
        }
    })

The result:

Thread-5|a=5
Thread-1|a=1
Thread-3|a=3
Thread-2|a=2
Thread-4|a=4
Thread-2|2*1=2
Thread-3|3*1=3
Thread-1|1*1=1
Thread-5|5*1=5
Thread-4|4*1=4
Thread-1|2*2=2
Thread-5|10*2=10
Thread-3|6*2=6
Thread-4|8*2=8
Thread-2|4*2=4
Thread-3|18*3=18
Thread-1|6*3=6
Thread-5|30*3=30
Thread-2|12*3=12
Thread-4|24*3=24
Thread-4|96*4=96
Thread-2|48*4=48
Thread-5|120*4=120
Thread-1|24*4=24
Thread-3|72*4=72
Thread-5|600*5=600
Thread-4|480*5=480
Thread-3|360*5=360
Thread-1|120*5=120
Thread-2|240*5=240
Thread-1|TaskDurationInMillis = 765
Thread-3|TaskDurationInMillis = 765
Thread-4|TaskDurationInMillis = 765
Thread-5|TaskDurationInMillis = 765
Thread-2|TaskDurationInMillis = 765

Now let me use the join() for threads:

    val timeInMillis = System.currentTimeMillis()
    ThreadUtils.startNewThread(Runnable {
        for (i in 1..5) {
            val t = Thread(Runnable {
                Thread.sleep(50)
                var a = i
                kotlin.io.println(Thread.currentThread().name + "|" + "a=$a")
                Thread.sleep(200)
                for (j in 1..5) {
                    a *= j
                    Thread.sleep(100)
                    kotlin.io.println(Thread.currentThread().name + "|" + "$a*$j=$a")
                }
                kotlin.io.println(Thread.currentThread().name + "|TaskDurationInMillis = " + (System.currentTimeMillis() - timeInMillis))
            })
            t.start()
            t.join()
        }
    })

And the result:

Thread-1|a=1
Thread-1|1*1=1
Thread-1|2*2=2
Thread-1|6*3=6
Thread-1|24*4=24
Thread-1|120*5=120
Thread-1|TaskDurationInMillis = 815
Thread-2|a=2
Thread-2|2*1=2
Thread-2|4*2=4
Thread-2|12*3=12
Thread-2|48*4=48
Thread-2|240*5=240
Thread-2|TaskDurationInMillis = 1568
Thread-3|a=3
Thread-3|3*1=3
Thread-3|6*2=6
Thread-3|18*3=18
Thread-3|72*4=72
Thread-3|360*5=360
Thread-3|TaskDurationInMillis = 2323
Thread-4|a=4
Thread-4|4*1=4
Thread-4|8*2=8
Thread-4|24*3=24
Thread-4|96*4=96
Thread-4|480*5=480
Thread-4|TaskDurationInMillis = 3078
Thread-5|a=5
Thread-5|5*1=5
Thread-5|10*2=10
Thread-5|30*3=30
Thread-5|120*4=120
Thread-5|600*5=600
Thread-5|TaskDurationInMillis = 3833

As it's clear when we use the join:

  1. The threads are running sequentially.
  2. The first sample takes 765 Milliseconds while the second sample takes 3833 Milliseconds.

Our solution to prevent blocking other threads was creating an ArrayList:

val threads = ArrayList<Thread>()

Now when we want to start a new thread we most add it to the ArrayList:

addThreadToArray(
    ThreadUtils.startNewThread(Runnable {
        ...
    })
)

The addThreadToArray function:

@Synchronized
fun addThreadToArray(th: Thread) {
    threads.add(th)
}

The startNewThread funstion:

fun startNewThread(runnable: Runnable) : Thread {
    val th = Thread(runnable)
    th.isDaemon = false
    th.priority = Thread.MAX_PRIORITY
    th.start()
    return th
}

Check the completion of the threads as below everywhere it's needed:

val notAliveThreads = ArrayList<Thread>()
for (t in threads)
    if (!t.isAlive)
        notAliveThreads.add(t)
threads.removeAll(notAliveThreads)
if (threads.size == 0){
    // The size is 0 -> there is no alive threads.
}
于 2019-07-24T13:01:35.580 回答
0

The problem with:

for(i = 0; i < threads.length; i++)
  threads[i].join();

...is, that threads[i + 1] never can join before threads[i]. Except the "latch"ed ones, all solutions have this lack.

No one here (yet) mentioned ExecutorCompletionService, it allows to join threads/tasks according to their completion order:

public class ExecutorCompletionService<V> extends Object implements CompletionService<V>

A CompletionService that uses a supplied Executor to execute tasks. This class arranges that submitted tasks are, upon completion, placed on a queue accessible using take. The class is lightweight enough to be suitable for transient use when processing groups of tasks.

Usage Examples.

Suppose you have a set of solvers for a certain problem, each returning a value of some type Result, and would like to run them concurrently, processing the results of each of them that return a non-null value, in some method use(Result r). You could write this as:

void solve(Executor e, Collection<Callable<Result>> solvers) throws InterruptedException, ExecutionException {
  CompletionService<Result> cs = new ExecutorCompletionService<>(e);
  solvers.forEach(cs::submit);
  for (int i = solvers.size(); i > 0; i--) {
    Result r = cs.take().get();
    if (r != null)
      use(r);
  }
}

Suppose instead that you would like to use the first non-null result of the set of tasks, ignoring any that encounter exceptions, and cancelling all other tasks when the first one is ready:

void solve(Executor e, Collection<Callable<Result>> solvers) throws InterruptedException {
  CompletionService<Result> cs = new ExecutorCompletionService<>(e);
  int n = solvers.size();
  List<Future<Result>> futures = new ArrayList<>(n);
  Result result = null;
  try {
    solvers.forEach(solver -> futures.add(cs.submit(solver)));
    for (int i = n; i > 0; i--) {
      try {
        Result r = cs.take().get();
        if (r != null) {
          result = r;
          break;
        }
      } catch (ExecutionException ignore) {}
    }
  } finally {
    futures.forEach(future -> future.cancel(true));
  }

  if (result != null)
    use(result);
}

Since: 1.5 (!)

Assuming use(r) (of Example 1) also asynchronous, we had a big advantage. #

于 2021-12-22T21:06:04.990 回答