我相信我有解决该问题的方法,经过全面测试后,我在应用程序中实现了它并且它有效。
withPool 闭包传入创建的池(jsr166y.ForkJoinPool)作为第一个参数。我可以抓住它并将其存储在一个变量(currentPool)中,供主线程稍后使用,如下所示:
GParsPool.withPool { pool ->
currentPool = pool
当抛出异常并返回主线程进行处理时,我可以让它等到一切都完成,如下所示:
} catch (Exception exc) {
if (currentPool) {
while (!currentPool.isQuiescent()) {
Thread.sleep(100)
println 'waiting for threads to finish'
}
}
println 'all done'
}
isQuiescent() 似乎是确保没有更多工作要做的安全方法。
请注意,在测试过程中,我还发现异常似乎并没有像我最初想象的那样终止循环的执行。如果我有一个 500 个列表并执行了 eachParallel,那么无论第一个是否有错误,它们都会运行。所以我不得不在并行循环的异常处理程序中使用 currentPool.shutdownNow() 来终止循环。另请参阅:GPars - 提前终止并行收集的正确方法
这是实际解决方案的完整简化表示:
void example() {
jsr166y.ForkJoinPool currentPool
AtomicInteger threadCounter = new AtomicInteger(0)
AtomicInteger threadCounterEnd = new AtomicInteger(0)
AtomicReference<Exception> realException = new AtomicReference<Exception>()
try {
GParsPool.withPool { pool ->
currentPool = pool
(1..500).eachParallel {
try {
if (threadCounter.incrementAndGet() == 1) {
throw new RuntimeException('planet blew up!')
}
if (realException.get() != null) {
// We had an exception already in this eachParallel - quit early
return
}
// Do some long work
Integer counter=0
(1..1000000).each() {counter++}
// Flag if we went all the way through
threadCounterEnd.incrementAndGet()
} catch (Exception exc) {
realException.compareAndSet(null, exc)
pool.shutdownNow()
throw realException
}
}
}
} catch (Exception exc) {
// If we used pool.shutdownNow(), we need to look at the real exception.
// This is needed because pool.shutdownNow() sometimes generates a CancellationException
// which can cover up the real exception that caused us to do a shutdownNow().
if (realException.get()) {
exc = realException.get()
}
if (currentPool) {
while (!currentPool.isQuiescent()) {
Thread.sleep(100)
println 'waiting for threads to finish'
}
}
// Do further exception handling here...
exc.printStackTrace()
}
}
回到我之前的例子,如果我在 4 核机器上第一次抛出异常,大约有 5 个线程排队。shutdownNow() 将在大约 20 个左右的线程通过后切断事物,因此在顶部附近进行“提前退出”检查有助于这 20 个左右的线程尽快退出。
只是把它贴在这里以防它帮助别人,以换取我在这里得到的所有帮助。谢谢!