16

这个问题是两年前提出的,但它提到的资源要么不是很有帮助(恕我直言),要么链接不再有效。

必须有一些很好的教程才能理解Phaser。我已经阅读了 javadoc,但是我的眼睛呆滞了,因为为了真正理解 javadoc,您必须知道应该如何使用这些类。

有人有什么建议吗?

4

3 回答 3

47

对于 Phaser,我已经回答了几个问题。看到它们可能有助于理解它们的应用。它们在底部链接。但是要了解 Phaser 的作用以及为什么它有用,了解它解决了什么很重要。

这是 CountdownLatch 和 CyclicBarrier 的属性

笔记:

  • 参与方数量是另一种说法#不同线程数
  • 不可重用意味着您必须在重用之前创建屏障的新实例
  • 如果线程可以到达并继续工作而不等待其他线程或可以等待所有线程完成,则屏障是可推进的

倒计时锁

  • 固定人数
  • 不可重复使用
  • Advanceable (看latch.countDown(); Advanceable latch.await(); 必须等待

循环障碍

  • 固定人数
  • 可重复使用的
  • 不可推进

所以 CountdownLatch 是不可重用的,你必须每次都创建一个新的实例,但它是可重复的。CylicBarrier 可以重复使用,但所有线程必须等待每一方到达屏障。

移相器

  • 动态参与人数
  • 可重复使用的
  • 先进的

当一个线程想要被他们调用的 Phaser 知道phaser.register()时,当线程到达他们调用的屏障时phaser.arrive() ,这里是它可以推进的地方。如果线程想要等待所有已注册的任务完成phaser.arriveAndAwaitAdvance()

还有一个阶段的概念,其中线程可以等待可能尚未完成的其他操作的完成。一旦所有线程都到达移相器的屏障,就会创建一个新阶段(增量为 1)。

你可以看看我的其他答案,也许会有所帮助:

Java ExecutorService:等待终止所有递归创建的任务

灵活的倒计时?

于 2011-07-26T13:59:04.797 回答
8

Phaser至少,我认为 JavaDoc 提供了相当清晰的解释。这是一个用于同步一批线程的类,从某种意义上说,您可以使用 a 注册批处理中的每个线程,Phaser然后使用Phaser让它们阻塞,直到批处理中的每个线程都通知了Phaser,此时任何阻塞的线程都将开始执行。该等待/通知周期可以根据需要/要求一遍又一遍地重复。

他们的示例代码给出了一个合理的例子(尽管我非常不喜欢他们的 2 字符缩进样式):

void runTasks(List<Runnable> tasks) {
   final Phaser phaser = new Phaser(1); // "1" to register self
   // create and start threads
   for (final Runnable task : tasks) {
     phaser.register();
     new Thread() {
       public void run() {
         phaser.arriveAndAwaitAdvance(); // await all creation
         task.run();
       }
     }.start();
   }

   // allow threads to start and deregister self
   phaser.arriveAndDeregister();
 } 

这设置了Phaser一个注册计数为 的tasks.size() + 1,并为每个任务创建一个新的Thread,它将阻塞直到下一次推进Phaser(即tasks.size() + 1到达记录的时间),然后运行其关联的任务。创建的每一个Thread也会立即启动,因此会在记录Phaser到达的情况下退出循环tasks.size()

最终调用phaser.arriveAndDeregister()将记录最终到达,并减少注册计数,使其现在等于tasks.size()。这会导致Phaser前进,这实际上允许所有任务同时开始运行。这可以通过执行以下操作来重复:

void runTasks(List<Runnable> tasks) {
   final Phaser phaser = new Phaser(1); // "1" to register self
   // create and start threads
   for (final Runnable task : tasks) {
     phaser.register();
     new Thread() {
       public void run() {
         while (true) {
           phaser.arriveAndAwaitAdvance(); // await all creation
           task.run();
         }
       }
     }.start();
   }

   // allow threads to start and deregister self
   phaser.arriveAndDeregister();
 }

...这与以前相同,只是添加了一个导致任务重复运行的循环。因为每个迭代调用phaser.arriveAndAwaitAdvance()任务线程的执行将被同步,这样task-0 不会开始它的第二次迭代,直到所有其他任务完成它的第一次迭代并通知Phaser准备好开始它的第二次迭代。

如果您正在运行的任务的执行时间差异很大,并且如果您想确保较快的线程不会与较慢的线程不同步,这可能会很有用。

对于可能的现实世界应用程序,请考虑运行单独的图形和物理线程的游戏。如果图形线程卡在第 6 帧上,您不想让物理线程计算第 100 帧的数据,并且使用Phaser一种可能的方法来确保图形和物理线程始终在同一帧上工作时间(而且,如果一个线程比另一个线程慢得多,则较快的线程会优雅地产生 CPU 资源,因此希望较慢的线程可以更快地赶上)。

于 2011-07-26T14:09:50.963 回答
2

Phaser在功能上与 CyclicBarrier 和 CountDownLatch 有点相似,但它提供了比它们都更大的灵活性。

在 CyclicBarrier 中,我们曾经在构造函数中注册各方,但 Phaser 为我们提供了随时注册和取消注册各方的灵活性。对于注册方,我们可以使用以下任何一种 -

  • 构造函数
  • 注册,或
  • 批量注册

对于注销方,我们可以使用 -

  • 到达并注销,或


register- 向此移相器添加/注册一个新的未到达方。它返回

  • 此注册适用的到达阶段编号。
  • 如果移相器已终止,则值为负并且注册无效。

如果 onAdvance() 方法的调用正在进行中,则在返回此方法之前可能会等待其完成。如果此移相器有父移相器,并且此移相器没有注册方,则此子移相器也向其父移相器注册。
演示 Phaser 用法的程序>

import java.util.concurrent.Phaser;

public class PhaserTest {
public static void main(String[] args) {

       /*Creates a new phaser with 1 registered unArrived parties 
        * and initial phase number is 0
        */
       Phaser phaser=new Phaser(1);
       System.out.println("new phaser with 1 registered unArrived parties"
                    + " created and initial phase  number is 0 ");

       //Create 3 threads
       Thread thread1=new Thread(new MyRunnable(phaser,"first"),"Thread-1");
       Thread thread2=new Thread(new MyRunnable(phaser,"second"),"Thread-2");
       Thread thread3=new Thread(new MyRunnable(phaser,"third"),"Thread-3");

       System.out.println("\n--------Phaser has started---------------");
       //Start 3 threads
       thread1.start();
       thread2.start();
       thread3.start();

       //get current phase
       int currentPhase=phaser.getPhase();
       /*arriveAndAwaitAdvance() will cause thread to wait until current phase
        * has been completed i.e. until all registered threads
        * call arriveAndAwaitAdvance()
        */
       phaser.arriveAndAwaitAdvance();
       System.out.println("------Phase-"+currentPhase+" has been COMPLETED----------");

       //------NEXT PHASE BEGINS------

       currentPhase=phaser.getPhase();
       phaser.arriveAndAwaitAdvance();
       System.out.println("------Phase-"+currentPhase+" has been COMPLETED----------");

       /* current thread Arrives and deRegisters from phaser.
        * DeRegistration reduces the number of parties that may
        * be required to advance in future phases.
        */
       phaser.arriveAndDeregister();

       //check whether phaser has been terminated or not.
       if(phaser.isTerminated())
              System.out.println("\nPhaser has been terminated");

} 
}





class MyRunnable implements Runnable{

Phaser phaser;

MyRunnable(Phaser phaser,String name){
       this.phaser=phaser;
       this.phaser.register(); //Registers/Add a new unArrived party to this phaser.
       System.out.println(name +" - New unarrived party has "
                    + "been registered with phaser");
}

@Override
public void run() {
       System.out.println(Thread.currentThread().getName() +
                    " - party has arrived and is working in "
                    + "Phase-"+phaser.getPhase());
       phaser.arriveAndAwaitAdvance();

       //Sleep has been used for formatting output
       try {
              Thread.sleep(1000);
       } catch (InterruptedException e) {
              e.printStackTrace();
       }

       //------NEXT PHASE BEGINS------

       System.out.println(Thread.currentThread().getName() +
                    " - party has arrived and is working in "
                    + "Phase-"+phaser.getPhase());
       phaser.arriveAndAwaitAdvance();  

       phaser.arriveAndDeregister();
}

}


bulkRegister - 将新的未到达方的参与方数量添加到此移相器。它返回

  • 此注册适用的到达阶段编号。
  • 如果移相器已终止,则值为负并且注册无效。

如果 onAdvance() 方法的调用正在进行中,则在返回此方法之前可能会等待其完成。

到达AndDeregister- 当前线程(Party)从移相器到达和取消注册。取消注册减少了未来可能需要进入下一阶段的参与方数量。

如果此移相器有父移相器,并且此移相器没有注册方,则此子移相器也向其父移相器注册。 演示父子移相器的程序

import java.util.concurrent.Phaser;

public class PhaserParentChildTest {
public static void main(String[] args) {


    /*
  * Creates a new phaser with no registered unArrived parties.
  */
    Phaser parentPhaser = new Phaser();

    /*
  * Creates a new phaser with the given parent &
  * no registered unArrived parties.
  */
    Phaser childPhaser = new Phaser(parentPhaser,0);

    childPhaser.register();

    System.out.println("parentPhaser isTerminated : "+parentPhaser.isTerminated());
    System.out.println("childPhaser isTerminated : "+childPhaser.isTerminated());

    childPhaser.arriveAndDeregister();
    System.out.println("\n--childPhaser has called arriveAndDeregister()-- \n");

    System.out.println("parentPhaser isTerminated : "+parentPhaser.isTerminated());
    System.out.println("childPhaser isTerminated : "+childPhaser.isTerminated());

}
}

在http://www.javamadesoeasy.com/2015/03/phaser-in-java_21.html上阅读有关 Phaser 的更多信息

于 2015-05-08T20:10:19.827 回答