0

终止并行收集的最佳方法是什么(在线程之一引发异常或用户发起中断的情况下)?

无论哪种情况,我都可以轻松设置一些标志,然后在循环顶部检查它。但是,如果我的集合中有 10,000 件物品,我宁愿告诉将它们送入 ForkJoinPool 的任何东西停止送入它们。让已经开始的 5 或 20 件左右完成,但不要再开始了。

这是一个示例,您可以插入以了解我的意思。如果我模拟其中一个线程抛出异常,则看起来集合停止处理,因为线程计数的第一次打印(在“A:”消息处)通常非常小(5 左右)。但是,通过在 GParsPool.withPool 之外打印计数(在“B:”处),我可以看到它们确实一直在运行(始终都是 100):

void doIt() {
    AtomicInteger threadCounter = new AtomicInteger(0)
    AtomicInteger threadCounterEnd = new AtomicInteger(0)

    try {
        GParsPool.withPool { pool ->

            try {
                (1..100).eachParallel {
                    try {
                        if (threadCounter.incrementAndGet() == 1) {
                            throw new RuntimeException('planet blew up!')
                        }

                        // Do some long work
                        Integer counter=0
                        (1..1000000).each() {counter++}

                        // Flag if we went all the way through
                        threadCounterEnd.incrementAndGet()
                    } catch (Exception exc) {
                        //pool.shutdownNow()
                        throw new RuntimeException(exc)
                    }
                }
            } catch (Exception exc) {
                println 'caught exception'
                //pool.shutdownNow()
                println "A: threads launched was ${threadCounter}, ended was ${threadCounterEnd}"
                throw new RuntimeException(exc)
            }
        }
    } catch (Exception exc) {
        exc.printStackTrace()
    }

    println "B: threads launched was ${threadCounter}, ended was ${threadCounterEnd}"
}

如果我在 eachParallel 中使用 pool.shutdown(),它没有影响。如果我使用 pool.shutdownNow(),它会像我想要的那样结束处理,但会抛出一个 CancellationException,这会掩盖我想要的真正异常。我可以将“真正的”异常存储在一个变量中以供以后访问,但我想知道是否没有更好的方法来告诉并行集合干净地停止。

4

1 回答 1

2

没有这种机制的ATM。由于池无法区分您的任务和其他潜在计算的任务,因此它没有阻止线程读取任务队列的机制。如果您知道您是线程池的唯一用户,则可以按照您的建议使用 shutdownNow()(具有上述后果)。

我觉得您也知道的另一个选项是直接正确捕获任务中的异常并设置共享原子标志以指示其他任务立即退出。

于 2012-12-13T10:38:42.467 回答