可以在此处找到有关如何使用 Phaser 和遇到的问题的基本概念的 repo:https ://github.com/hipy/phaser/tree/master/src
我一直致力于使用 Phaser 使用 ThreadPools 使 Dijkstra 算法更高效。我用一个循环进行了很多迭代,每次迭代都需要一个 Phaser 来等待 ThreadPool 中的线程完成,然后再继续当前的迭代。
我遇到了一个问题,Phaser 没有正确等待。当我使用 ArriveAndDeregister() 时,Phaser 在每个线程完成后进入终止状态。当我调用 Arrive() 时,未到达方的数量不会减少,因此迭代会卡住。
下面的所有代码都在一个调用一次的 apply() 方法中运行。
下面的代码为要执行的线程创建任务。
ThreadPoolExecutor executor = (ThreadPoolExecutor) Executors.newFixedThreadPool((numberOfThreads));
Phaser phaser = new Phaser();
//Thread class
class ClosestNodeTask implements Runnable {
private int start;
private int end;
private Phaser phaser;
public ClosestNodeTask(int start, int end, Phaser phaser) {
this.start = start;
this.end = end;
this.phaser = phaser;
}
@Override
public void run() {
getNodeShortestDistanced(start, end, phaser); //method calls phaser.arrive() when done
}
}
for (int t = 0; t < numberOfThreads; t++) {
if (nodesModulo > 0 && numberOfThreads == (t + 1)) {
start = nodesPerThread * (t);
end = nodesPerThread * (t + 1) + nodesModulo;
tasks[t] = new ClosestNodeTask(start, end, phaser);
} else {
start = nodesPerThread * t;
end = nodesPerThread * (t + 1);
tasks[t] = new ClosestNodeTask(start, end, phaser);
}
}
下面的代码在 for 循环中为每次迭代执行。在这种情况下,有 30.000 次迭代:
phaser.register(); //register main thread
for(int t = 0; t < tasks.length; t++) {
phaser.register();
}
System.out.println("Phaser unarrived party size is now: " + phaser.getUnarrivedParties());
跳过一些算法代码,在 for 循环中执行以下代码,启动线程并等待它完成:
for(int t = 0; t < tasks.length; t++) {
executor.execute(tasks[t]);
}
phaser.arriveAndAwaitAdvance();
输出如下:
Phasecount: 0
Phaser unarrived party size is now: 3
Task size: 2
Adding: 613 //Next closest node in a sub-group, result of work done in a thread
Adding: 2870
all tasks done
-----------------done-------------
Phasecount: 1
Phaser unarrived party size is now: 6
Task size: 2
Adding: 1
Adding: 2870
第一阶段执行,进行完整的迭代。第二阶段卡住了。未到达的派对数量为 6。3 个新派对和显然 3 个旧派对,即使我调用了 phaser.arrive() 也没有注册为到达。ArriveAndAwaitAdvance() 也没有等待,因为在下一次迭代中有 6 个未到达方而不是 3 个。
我尝试使用到达AndDeregister() 但这会导致阶段终止(阶段计数有一个很大的负值)。
我怎么能解决这个问题?我不想终止一个阶段,但我确实想在每次迭代时将各方注册为到达。
谢谢!