2

ForkJoinPool使用提交任务(RecursiveAction或)时处理异常(未捕获)的更好方法是什么RecursiveTask

ForkJoinPoolThread.UncaughtExceptionHandler在 WorkerThread 突然终止时接受 a 来处理异常(这无论如何都不受我们的控制),但是在ForkJoinTask抛出异常时不使用此处理程序。我在我的实现中使用标准submit/invokeAll方式。

这是我的场景:

我有一个线程在无限循环中运行,从第 3 方系统读取数据。在此线程中,我将任务提交给ForkJoinPool

new Thread() {
      public void run() {
         while (true) {
             ForkJoinTask<Void> uselessReturn = 
                   ForkJoinPool.submit(RecursiveActionTask);
         }
      }
 }

我正在使用 RecursiveAction,在少数情况下使用 RecursiveTask。这些任务使用submit()方法提交给 FJPool。我想要一个通用的异常处理程序,类似于UncaughtExceptionHandler如果任务抛出未经检查/未捕获的异常,我可以处理异常并在需要时重新提交任务。处理异常还可以确保如果一个/一些任务抛出异常,排队的任务不会被取消。

invokeAll()方法返回一组 ForkJoinTasks 但这些任务在递归块中(每个任务调用该compute()方法并且可以进一步拆分 [假设场景])

class RecursiveActionTask extends RecursiveAction {

    public void compute() {
       if <task.size() <= ACCEPTABLE_SIZE) {
          processTask() // this might throw an checked/unchecked exception
       } else {
          RecursiveActionTask[] splitTasks = splitTasks(tasks)
          RecursiveActionTasks returnedTasks = invokeAll(splitTasks);
          // the below code never executes as invokeAll submits the tasks to the pool 
          // and the flow never comes to the code below.
          // I am looking for some handling like this
          for (RecusiveActionTask task : returnedTasks) {
             if (task.isDone()) {
                task.getException() // handle this exception
             }
          }
       }
    }

}

我注意到当 3-4 个任务失败时,整个队列提交单元被丢弃。目前我已经放了一个try/catch我个人不喜欢的围绕 processTask 。我正在寻找更通用的。

  1. 我还想知道所有失败的任务列表,以便我可以重新提交它们
  2. 当任务抛出异常时,线程是否会从池中被逐出(尽管我的分析发现它们没有 [但不确定])?
  3. 在 FutureTask 上调用get()方法更有可能让我的流程按顺序等待,直到任务完成。
  4. 我只想知道任务失败的状态。我不在乎它什么时候完成(显然不想等一个小时后)

任何想法如何处理上述场景中的异常?

4

2 回答 2

2

这就是我们在 Akka 中解决它的方法:

/**
 * INTERNAL AKKA USAGE ONLY
 */
final class MailboxExecutionTask(mailbox: Mailbox) extends ForkJoinTask[Unit] {
  final override def setRawResult(u: Unit): Unit = ()
  final override def getRawResult(): Unit = ()
  final override def exec(): Boolean = try { mailbox.run; true } catch {
    case anything ⇒
      val t = Thread.currentThread
      t.getUncaughtExceptionHandler match {
        case null ⇒
        case some ⇒ some.uncaughtException(t, anything)
      }
      throw anything
  }
 }
于 2012-04-13T09:12:58.567 回答
0

@Rajendra,你做的一切都是对的,除了你应该使用 ForkJoinPool execute() 而不是 submit()。

这样,如果 Runnable 失败,它将强制工作异常,并且它将被您的 UncaughtExceptionHandler 捕获。

我不知道为什么会存在这种行为,但它会起作用!我已经很难学会了:(

取自 Java 8 代码:
提交使用的是 AdaptedRunnableAction()。
执行正在使用 RunnableExecuteAction() (请参阅rethrow(ex))。

 /**
 * Adaptor for Runnables without results
 */
static final class AdaptedRunnableAction extends ForkJoinTask<Void>
    implements RunnableFuture<Void> {
    final Runnable runnable;
    AdaptedRunnableAction(Runnable runnable) {
        if (runnable == null) throw new NullPointerException();
        this.runnable = runnable;
    }
    public final Void getRawResult() { return null; }
    public final void setRawResult(Void v) { }
    public final boolean exec() { runnable.run(); return true; }
    public final void run() { invoke(); }
    private static final long serialVersionUID = 5232453952276885070L;
}

/**
 * Adaptor for Runnables in which failure forces worker exception
 */
static final class RunnableExecuteAction extends ForkJoinTask<Void> {
    final Runnable runnable;
    RunnableExecuteAction(Runnable runnable) {
        if (runnable == null) throw new NullPointerException();
        this.runnable = runnable;
    }
    public final Void getRawResult() { return null; }
    public final void setRawResult(Void v) { }
    public final boolean exec() { runnable.run(); return true; }
    void internalPropagateException(Throwable ex) {
        rethrow(ex); // rethrow outside exec() catches.
    }
    private static final long serialVersionUID = 5232453952276885070L;
}
于 2018-05-13T16:41:37.037 回答