这个问题是两年前提出的,但它提到的资源要么不是很有帮助(恕我直言),要么链接不再有效。
必须有一些很好的教程才能理解Phaser
。我已经阅读了 javadoc,但是我的眼睛呆滞了,因为为了真正理解 javadoc,您必须知道应该如何使用这些类。
有人有什么建议吗?
对于 Phaser,我已经回答了几个问题。看到它们可能有助于理解它们的应用。它们在底部链接。但是要了解 Phaser 的作用以及为什么它有用,了解它解决了什么很重要。
这是 CountdownLatch 和 CyclicBarrier 的属性
笔记:
倒计时锁
latch.countDown();
Advanceable
latch.await();
必须等待)循环障碍
所以 CountdownLatch 是不可重用的,你必须每次都创建一个新的实例,但它是可重复的。CylicBarrier 可以重复使用,但所有线程必须等待每一方到达屏障。
移相器
当一个线程想要被他们调用的 Phaser 知道phaser.register()
时,当线程到达他们调用的屏障时phaser.arrive()
,这里是它可以推进的地方。如果线程想要等待所有已注册的任务完成phaser.arriveAndAwaitAdvance()
还有一个阶段的概念,其中线程可以等待可能尚未完成的其他操作的完成。一旦所有线程都到达移相器的屏障,就会创建一个新阶段(增量为 1)。
你可以看看我的其他答案,也许会有所帮助:
Phaser
至少,我认为 JavaDoc 提供了相当清晰的解释。这是一个用于同步一批线程的类,从某种意义上说,您可以使用 a 注册批处理中的每个线程,Phaser
然后使用Phaser
让它们阻塞,直到批处理中的每个线程都通知了Phaser
,此时任何阻塞的线程都将开始执行。该等待/通知周期可以根据需要/要求一遍又一遍地重复。
他们的示例代码给出了一个合理的例子(尽管我非常不喜欢他们的 2 字符缩进样式):
void runTasks(List<Runnable> tasks) {
final Phaser phaser = new Phaser(1); // "1" to register self
// create and start threads
for (final Runnable task : tasks) {
phaser.register();
new Thread() {
public void run() {
phaser.arriveAndAwaitAdvance(); // await all creation
task.run();
}
}.start();
}
// allow threads to start and deregister self
phaser.arriveAndDeregister();
}
这设置了Phaser
一个注册计数为 的tasks.size() + 1
,并为每个任务创建一个新的Thread
,它将阻塞直到下一次推进Phaser
(即tasks.size() + 1
到达记录的时间),然后运行其关联的任务。创建的每一个Thread
也会立即启动,因此会在记录Phaser
到达的情况下退出循环tasks.size()
。
最终调用phaser.arriveAndDeregister()
将记录最终到达,并减少注册计数,使其现在等于tasks.size()
。这会导致Phaser
前进,这实际上允许所有任务同时开始运行。这可以通过执行以下操作来重复:
void runTasks(List<Runnable> tasks) {
final Phaser phaser = new Phaser(1); // "1" to register self
// create and start threads
for (final Runnable task : tasks) {
phaser.register();
new Thread() {
public void run() {
while (true) {
phaser.arriveAndAwaitAdvance(); // await all creation
task.run();
}
}
}.start();
}
// allow threads to start and deregister self
phaser.arriveAndDeregister();
}
...这与以前相同,只是添加了一个导致任务重复运行的循环。因为每个迭代调用phaser.arriveAndAwaitAdvance()
任务线程的执行将被同步,这样task-0 不会开始它的第二次迭代,直到所有其他任务完成它的第一次迭代并通知Phaser
准备好开始它的第二次迭代。
如果您正在运行的任务的执行时间差异很大,并且如果您想确保较快的线程不会与较慢的线程不同步,这可能会很有用。
对于可能的现实世界应用程序,请考虑运行单独的图形和物理线程的游戏。如果图形线程卡在第 6 帧上,您不想让物理线程计算第 100 帧的数据,并且使用Phaser
一种可能的方法来确保图形和物理线程始终在同一帧上工作时间(而且,如果一个线程比另一个线程慢得多,则较快的线程会优雅地产生 CPU 资源,因此希望较慢的线程可以更快地赶上)。
Phaser在功能上与 CyclicBarrier 和 CountDownLatch 有点相似,但它提供了比它们都更大的灵活性。
在 CyclicBarrier 中,我们曾经在构造函数中注册各方,但 Phaser 为我们提供了随时注册和取消注册各方的灵活性。对于注册方,我们可以使用以下任何一种 -
对于注销方,我们可以使用 -
register-
向此移相器添加/注册一个新的未到达方。它返回
如果 onAdvance() 方法的调用正在进行中,则在返回此方法之前可能会等待其完成。如果此移相器有父移相器,并且此移相器没有注册方,则此子移相器也向其父移相器注册。
演示 Phaser 用法的程序>
import java.util.concurrent.Phaser;
public class PhaserTest {
public static void main(String[] args) {
/*Creates a new phaser with 1 registered unArrived parties
* and initial phase number is 0
*/
Phaser phaser=new Phaser(1);
System.out.println("new phaser with 1 registered unArrived parties"
+ " created and initial phase number is 0 ");
//Create 3 threads
Thread thread1=new Thread(new MyRunnable(phaser,"first"),"Thread-1");
Thread thread2=new Thread(new MyRunnable(phaser,"second"),"Thread-2");
Thread thread3=new Thread(new MyRunnable(phaser,"third"),"Thread-3");
System.out.println("\n--------Phaser has started---------------");
//Start 3 threads
thread1.start();
thread2.start();
thread3.start();
//get current phase
int currentPhase=phaser.getPhase();
/*arriveAndAwaitAdvance() will cause thread to wait until current phase
* has been completed i.e. until all registered threads
* call arriveAndAwaitAdvance()
*/
phaser.arriveAndAwaitAdvance();
System.out.println("------Phase-"+currentPhase+" has been COMPLETED----------");
//------NEXT PHASE BEGINS------
currentPhase=phaser.getPhase();
phaser.arriveAndAwaitAdvance();
System.out.println("------Phase-"+currentPhase+" has been COMPLETED----------");
/* current thread Arrives and deRegisters from phaser.
* DeRegistration reduces the number of parties that may
* be required to advance in future phases.
*/
phaser.arriveAndDeregister();
//check whether phaser has been terminated or not.
if(phaser.isTerminated())
System.out.println("\nPhaser has been terminated");
}
}
class MyRunnable implements Runnable{
Phaser phaser;
MyRunnable(Phaser phaser,String name){
this.phaser=phaser;
this.phaser.register(); //Registers/Add a new unArrived party to this phaser.
System.out.println(name +" - New unarrived party has "
+ "been registered with phaser");
}
@Override
public void run() {
System.out.println(Thread.currentThread().getName() +
" - party has arrived and is working in "
+ "Phase-"+phaser.getPhase());
phaser.arriveAndAwaitAdvance();
//Sleep has been used for formatting output
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
//------NEXT PHASE BEGINS------
System.out.println(Thread.currentThread().getName() +
" - party has arrived and is working in "
+ "Phase-"+phaser.getPhase());
phaser.arriveAndAwaitAdvance();
phaser.arriveAndDeregister();
}
}
bulkRegister -
将新的未到达方的参与方数量添加到此移相器。它返回
如果 onAdvance() 方法的调用正在进行中,则在返回此方法之前可能会等待其完成。
到达AndDeregister- 当前线程(Party)从移相器到达和取消注册。取消注册减少了未来可能需要进入下一阶段的参与方数量。
如果此移相器有父移相器,并且此移相器没有注册方,则此子移相器也向其父移相器注册。 演示父子移相器的程序
import java.util.concurrent.Phaser;
public class PhaserParentChildTest {
public static void main(String[] args) {
/*
* Creates a new phaser with no registered unArrived parties.
*/
Phaser parentPhaser = new Phaser();
/*
* Creates a new phaser with the given parent &
* no registered unArrived parties.
*/
Phaser childPhaser = new Phaser(parentPhaser,0);
childPhaser.register();
System.out.println("parentPhaser isTerminated : "+parentPhaser.isTerminated());
System.out.println("childPhaser isTerminated : "+childPhaser.isTerminated());
childPhaser.arriveAndDeregister();
System.out.println("\n--childPhaser has called arriveAndDeregister()-- \n");
System.out.println("parentPhaser isTerminated : "+parentPhaser.isTerminated());
System.out.println("childPhaser isTerminated : "+childPhaser.isTerminated());
}
}
在http://www.javamadesoeasy.com/2015/03/phaser-in-java_21.html上阅读有关 Phaser 的更多信息