5

假设我有一个保存 SlaveThread 对象列表的 Master。在每个时间步上,我希望 Master 并行运行 SlaveThreads,但是,在时间步长结束时,我希望 SlaveThreads 在前进之前相互等待对方完成当前时间步长。另外,我不想在每个时间步重新实例化 SlaveThreads。我有 2 种可能的解决方案,但我不知道如何使它们中的任何一个起作用:

1) SlaveThread 中的run() 方法处于while(true) 循环中。在 SlaveThread 中执行单个循环后,我会让 SlaveThread 通知 Master(我不知道该怎么做),而 Master 会执行类似的操作

try{
    for (int i = 0; i < numSlaveThreads; i++) {
        while (!slaveThreads[i].getCompletedThisIter()) {
        wait()
        }
      }
  System.out.println("Joined");

}

在进入下一个时间步之前。我该怎么做?我怎样才能让一个 SlaveThread 只通知主人?

2) Slave 中的 run() 不在 while(true) 循环中,那么我必须在每次迭代时对其调用 start()。但是此时Slave的线程状态会被终止。如何在不重新实例化的情况下再次调用 start() ?

4

2 回答 2

5

这正是障碍的用途,您可以使用CyclicBarrierCountDownLatch来实现这一点。这些是用于延迟线程进度直到达到所需状态的同步器,在您的情况下,线程已经完成了它们的计算。

这取决于您要如何实现的细节:

锁存器用于等待事件;屏障用于等待其他线程。

对于CyclicBarrier将以以下方式完成的操作:

// whereby count is the number of your slave threads
this.barrier = new CyclicBarrier(count); 

然后在Runnable你的奴隶的定义中,你将在计算结束时插入:barrier.await()

public class Slaves implements Runnable {

   // ...

   @Override
   public void run() {

      while(condition) {

         // computation
         // ...

         try {
            // do not proceed, until all [count] threads
            // have reached this position
            barrier.await();
         } catch (InterruptedException ex) {
            return;
         } catch (BrokenBarrierException ex) {
            return;
         }
      }
   }
}

在所有线程完成计算之前,您的从属线程将不会继续。这样你就不需要实现另一个主线程之间的信号。

但是,如果您有一些代码要在所有线程到达该位置后执行(主信号),您可以向构造函数传递一个附加值Runnable,该CyclicBarrier构造函数将在所有线程到达屏障后执行。

this.barrier = new CyclicBarrier(count,
   new Runnable() {
      @Override
      public void run() {
         // signal your master thread, update values, etc.
      }
    }
 );
于 2012-07-22T17:58:52.617 回答
3

您可以使用ExecutorService的组合来管理您的线程(即,您可以回收您的线程而不必在每个周期创建新线程)和一个CyclicBarrier,它将同步所有从属。

请参阅下面的一个简单示例,其中主设备循环启动从设备,确保它们在重新启动之前都已完成。奴隶们有点懒,只是睡了一段时间(不是随机的):

public class Test {

    private static final ExecutorService executor = Executors.newFixedThreadPool(5);
    private static final CyclicBarrier barrier = new CyclicBarrier(5); //4 slaves + 1 master

    public static void main(String[] args) throws InterruptedException {
        Runnable master = new Runnable() {
            @Override
            public void run() {
                try {
                    while (true) {
                        System.out.println("Starting slaves");
                        for (int i = 100; i < 500; i += 100) {
                            executor.submit(getRunnable(i));
                        }
                        barrier.await();
                        System.out.println("All slaves done");
                    }
                } catch (InterruptedException | BrokenBarrierException ex) {
                    System.out.println("Bye Bye");
                }
            }
        };

        executor.submit(master);
        Thread.sleep(2000);
        executor.shutdownNow();

    }

    public static Runnable getRunnable(final int sleepTime) {
        return new Runnable() {
            @Override
            public void run() {
                try {
                    System.out.println("Entering thread " + Thread.currentThread() + " for " + sleepTime + " ms.");
                    Thread.sleep(sleepTime);
                    System.out.println("Exiting thread " + Thread.currentThread());
                    barrier.await();
                } catch (BrokenBarrierException | InterruptedException ex) {
                }
            }
        };

    }
}
于 2012-07-22T17:59:19.683 回答