1

在线程抛出异常的情况下,我如何才能等到所有未抛出异常的线程都完成(因此用户在一切停止之前不会再次启动)?

我以几种不同的方式使用 GPar,因此我需要为每种方式制定策略(并行集合、异步闭包和 fork/join)。异常不会被掩埋,它们可以通过承诺、getChildrenResults 等得到很好的处理,所以这不是问题(感谢 Vaclav Pech 的回答)。我只需要确保主线程一直等到仍在运行的任何内容完成或以其他方式停止。

例如,当使用并行集合时,一些线程会继续运行,而一些线程在异常发生后永远不会启动。所以很难说有多少人在外面等待,或者有可能抓住他们。

我的猜测是可能有一种方法可以使用线程池(在这种情况下为 GParsPool)。有什么建议么?

谢谢!

4

2 回答 2

3

我相信我有解决该问题的方法,经过全面测试后,我在应用程序中实现了它并且它有效。

withPool 闭包传入创建的池(jsr166y.ForkJoinPool)作为第一个参数。我可以抓住它并将其存储在一个变量(currentPool)中,供主线程稍后使用,如下所示:

    GParsPool.withPool { pool ->
        currentPool = pool

当抛出异常并返回主线程进行处理时,我可以让它等到一切都完成,如下所示:

    } catch (Exception exc) {
        if (currentPool) {
            while (!currentPool.isQuiescent()) {
                Thread.sleep(100)
                println 'waiting for threads to finish'
            }
        }

        println 'all done'
    }

isQuiescent() 似乎是确保没有更多工作要做的安全方法。

请注意,在测试过程中,我还发现异常似乎并没有像我最初想象的那样终止循环的执行。如果我有一个 500 个列表并执行了 eachParallel,那么无论第一个是否有错误,它们都会运行。所以我不得不在并行循环的异常处理程序中使用 currentPool.shutdownNow() 来终止循环。另请参阅:GPars - 提前终止并行收集的正确方法

这是实际解决方案的完整简化表示:

void example() {
    jsr166y.ForkJoinPool currentPool

    AtomicInteger threadCounter = new AtomicInteger(0)
    AtomicInteger threadCounterEnd = new AtomicInteger(0)

    AtomicReference<Exception> realException = new AtomicReference<Exception>()

    try {
        GParsPool.withPool { pool ->
            currentPool = pool

            (1..500).eachParallel {
                try {
                    if (threadCounter.incrementAndGet() == 1) {
                        throw new RuntimeException('planet blew up!')
                    }

                    if (realException.get() != null) {
                        // We had an exception already in this eachParallel - quit early
                        return
                    }

                    // Do some long work
                    Integer counter=0
                    (1..1000000).each() {counter++}

                    // Flag if we went all the way through
                    threadCounterEnd.incrementAndGet()
                } catch (Exception exc) {
                    realException.compareAndSet(null, exc)

                    pool.shutdownNow()
                    throw realException
                }
            }
        }
    } catch (Exception exc) {
        // If we used pool.shutdownNow(), we need to look at the real exception.
        // This is needed because pool.shutdownNow() sometimes generates a CancellationException
        // which can cover up the real exception that caused us to do a shutdownNow().
        if (realException.get()) {
            exc = realException.get()
        }

        if (currentPool) {
            while (!currentPool.isQuiescent()) {
                Thread.sleep(100)
                println 'waiting for threads to finish'
            }
        }

        // Do further exception handling here...
        exc.printStackTrace()
    }
}

回到我之前的例子,如果我在 4 核机器上第一次抛出异常,大约有 5 个线程排队。shutdownNow() 将在大约 20 个左右的线程通过后切断事物,因此在顶部附近进行“提前退出”检查有助于这 20 个左右的线程尽快退出。

只是把它贴在这里以防它帮助别人,以换取我在这里得到的所有帮助。谢谢!

于 2012-12-11T17:04:07.463 回答
2

我相信您将需要捕获异常,然后返回预期结果以外的其他内容(例如,如果您期望一个数字,则返回 String 或 null),即;

@Grab('org.codehaus.gpars:gpars:0.12')
import static groovyx.gpars.GParsPool.*

def results = withPool {
  [1,2,3].collectParallel {
    try {
      if( it % 2 == 0 ) {
        throw new RuntimeException( '2 fails' )
      }
      else {
        Thread.sleep( 2000 )
        it
      }
    }
    catch( e ) { e.class.name }
  }
}
于 2012-12-07T21:46:56.047 回答