终止并行收集的最佳方法是什么(在线程之一引发异常或用户发起中断的情况下)?
无论哪种情况,我都可以轻松设置一些标志,然后在循环顶部检查它。但是,如果我的集合中有 10,000 件物品,我宁愿告诉将它们送入 ForkJoinPool 的任何东西停止送入它们。让已经开始的 5 或 20 件左右完成,但不要再开始了。
这是一个示例,您可以插入以了解我的意思。如果我模拟其中一个线程抛出异常,则看起来集合停止处理,因为线程计数的第一次打印(在“A:”消息处)通常非常小(5 左右)。但是,通过在 GParsPool.withPool 之外打印计数(在“B:”处),我可以看到它们确实一直在运行(始终都是 100):
void doIt() {
AtomicInteger threadCounter = new AtomicInteger(0)
AtomicInteger threadCounterEnd = new AtomicInteger(0)
try {
GParsPool.withPool { pool ->
try {
(1..100).eachParallel {
try {
if (threadCounter.incrementAndGet() == 1) {
throw new RuntimeException('planet blew up!')
}
// Do some long work
Integer counter=0
(1..1000000).each() {counter++}
// Flag if we went all the way through
threadCounterEnd.incrementAndGet()
} catch (Exception exc) {
//pool.shutdownNow()
throw new RuntimeException(exc)
}
}
} catch (Exception exc) {
println 'caught exception'
//pool.shutdownNow()
println "A: threads launched was ${threadCounter}, ended was ${threadCounterEnd}"
throw new RuntimeException(exc)
}
}
} catch (Exception exc) {
exc.printStackTrace()
}
println "B: threads launched was ${threadCounter}, ended was ${threadCounterEnd}"
}
如果我在 eachParallel 中使用 pool.shutdown(),它没有影响。如果我使用 pool.shutdownNow(),它会像我想要的那样结束处理,但会抛出一个 CancellationException,这会掩盖我想要的真正异常。我可以将“真正的”异常存储在一个变量中以供以后访问,但我想知道是否没有更好的方法来告诉并行集合干净地停止。