所以,我有这个非常灵活的 Phaser,但似乎我错过了一些东西。
我已经成功使用了 CyclicBarrier,但现在我也想要更灵活的东西,就像我说的那样。所以这里是代码:
声明:
private static final CountDownLatch synchronizer = new CountDownLatch(1);
private static AtomicBoolean HAS_TIMED_OUT = new AtomicBoolean(false);
代码:
try {
logger.INFO("CONNECTED - Peer ID properties: " + SYS_NEWLINE + peerSocket + SYS_NEWLINE + pID, true);
final int peerKQueries = sp.getInteger(peerSocket);
peerObjects = new String[peerKQueries];
peerValues = new BigDecimal[peerKQueries];
for ( int i = 0; i < peerObjects.length; i++ )
peerObjects[i] = sp.getString(peerSocket);
for ( int i = 0; i < peerValues.length; i++ )
peerValues[i] = sp.getBigDecimal(peerSocket);
final int phase1a = htPhaser1a.arrive();
if ( phase1a < 0 ) {
logger.ERROR("Rejecting Super Peer thread " + THREAD_ID + " because it arrived lately for Phase 1a!", true);
sp.close(peerSocket);
throw new IllegalThreadStateException();
} else {
logger.INFO(pID + " -> Arrived in HT phase 1a. Total arrivals: "+htPhaser1a.getArrivedParties(), true);
logger.INFO("Super Peer thread " + THREAD_ID + " will advance to HT Phase 1b/2 (phase number is "+phase1a+").", true);
// The last peer should also unblock the barrier.
if ( htPhaser1a.getArrivedParties() == TOTAL_PEERS.get() ) {
htPhaser1a.arrive();
synchronizer.countDown();
}
htPhaser1a.awaitAdvanceInterruptibly(phase1a, 30, TimeUnit.SECONDS);
}
} catch (IOException e) {
logger.ERROR("Super Peer thread " + THREAD_ID + " encountered an I/O error.", true);
sp.close(peerSocket);
throw new IllegalThreadStateException();
} catch (TimeoutException e) {
logger.INFO("Super Peer thread " + THREAD_ID + " timed out but will advance to HT Phase 1b/2.", true);
if ( HAS_TIMED_OUT.compareAndSet(false, true) ) {
logger.INFO("Parties NOT arrived in the timeout: "+(htPhaser1a.getUnarrivedParties()-1), true);
resetCriticalData(htPhaser1a.getArrivedParties());
htPhaser1a.forceTermination();
instantiateHTPhase1b();
instantiateHTPhase2();
instantiateHTPatch();
synchronizer.countDown();
}
} finally {
logger.INFO("Super Peer thread "+THREAD_ID+" is blocked!", true);
synchronizer.await();
logger.INFO("Super Peer thread's "+THREAD_ID+" blocking waived!", true);
}
sp.getSomething();
是 I/O 调用。
考虑到此代码示例由多个线程运行。
这是我的问题:我已确保不会超过 MAX_CLIENTS 到达移相器,所以如果 MAX_CLIENTS 到达,一切都很好。但是,我遇到了 TimeoutException 的问题。第一个是客户端(比如线程 A)将能够到达阶段的时间窗口(又名竞争条件),然后线程 B 中发生 TimeoutException,我正在线程 B 中动态实例化另一个带有到达方数量的移相器(比如 5),但随后线程 A 已经到达阶段(又名 phase1a 未发现 < 0)。我该如何纠正?我正在考虑使用信号量,但我认为这不值得努力,因为那样我可能需要重新考虑我这样做的方式。我还考虑过使用 Timer 并增加一个AtomicInteger
变量和当定时器到期时动态实例化Phaser。关于如何解决这个问题的任何想法?
编辑:
文档有一种bulkRegister(int parties)
方法,但措辞有点奇怪:
将给定数量的新未到达方添加到此移相器。如果正在进行的 onAdvance(int, int) 调用正在进行中,则此方法可能会在返回之前等待其完成。如果此移相器有父移相器,并且给定的参与方数量大于零,并且此移相器之前没有注册参与方,则此子移相器也向其父移相器注册。如果此移相器终止,则注册尝试无效,并返回负值。
问题: “可能”这个词让我感到困惑!“可能”如可能或“可能”
编辑:
解决。在下面检查我的答案。