5

在取消ForkJoinPool返回的 Future 时,我刚刚注意到以下现象。给定以下示例代码:

ForkJoinPool pool = new ForkJoinPool();
Future<?> fut = pool.submit(new Callable<Void>() {

  @Override
  public Void call() throws Exception {
    while (true) {
      if (Thread.currentThread().isInterrupted()) { // <-- never true
        System.out.println("interrupted");
        throw new InterruptedException();
      }
    }
  }
});

Thread.sleep(1000);
System.out.println("cancel");
fut.cancel(true);

该程序从不打印interrupted. ForkJoinTask#cancel(boolean)的文档说:

mayInterruptIfRunning - 这个值在默认实现中没有影响,因为中断不用于控制取消。

如果 ForkJoinTasks 忽略中断,您还应该如何检查提交给 ForkJoinPool 的 Callables 中的取消?

4

2 回答 2

7

发生这种情况是因为Future<?>is a ForkJoinTask.AdaptedCallablewhich extends ForkJoinTask,其取消方法是:

public boolean cancel(boolean mayInterruptIfRunning) {
    return setCompletion(CANCELLED) == CANCELLED;
}

private int setCompletion(int completion) {
    for (int s;;) {
        if ((s = status) < 0)
            return s;
        if (UNSAFE.compareAndSwapInt(this, statusOffset, s, completion)) {
            if (s != 0)
                synchronized (this) { notifyAll(); }
            return completion;
        }
    }
}

它不做任何中断,它只是设置状态。我想这是因为ForkJoinPoolssFuture可能有一个非常复杂的树结构,并且不清楚以什么顺序取消它们。

于 2014-01-24T06:49:21.533 回答
1

在@Mkhail 答案之上分享更多亮点:

使用 ForkJoinPool execute() 而不是 submit() 会强制一个失败的 Runnable 抛出一个 worker 异常,这个异常会被Thread UncaughtExceptionHandler 捕获。

取自 Java 8 代码:
提交使用的是 AdaptedRunnableAction()。
执行正在使用 RunnableExecuteAction() (请参阅rethrow(ex))。

 /**
 * Adaptor for Runnables without results
 */
static final class AdaptedRunnableAction extends ForkJoinTask<Void>
    implements RunnableFuture<Void> {
    final Runnable runnable;
    AdaptedRunnableAction(Runnable runnable) {
        if (runnable == null) throw new NullPointerException();
        this.runnable = runnable;
    }
    public final Void getRawResult() { return null; }
    public final void setRawResult(Void v) { }
    public final boolean exec() { runnable.run(); return true; }
    public final void run() { invoke(); }
    private static final long serialVersionUID = 5232453952276885070L;
}

/**
 * Adaptor for Runnables in which failure forces worker exception
 */
static final class RunnableExecuteAction extends ForkJoinTask<Void> {
    final Runnable runnable;
    RunnableExecuteAction(Runnable runnable) {
        if (runnable == null) throw new NullPointerException();
        this.runnable = runnable;
    }
    public final Void getRawResult() { return null; }
    public final void setRawResult(Void v) { }
    public final boolean exec() { runnable.run(); return true; }
    void internalPropagateException(Throwable ex) {
        rethrow(ex); // rethrow outside exec() catches.
    }
    private static final long serialVersionUID = 5232453952276885070L;
}
于 2018-05-13T16:53:11.117 回答