为什么 awaitQuiescence 方法不返回 false?
似乎在有待处理任务时awaitQuiescence
忽略timeout
并在调用者的线程中执行任务(参见源代码)。
线程转储:
"ForkJoinPool-1-worker-1" [...] Object.wait() [...]
java.lang.Thread.State: WAITING (on object monitor)
[...]
at java.util.concurrent.ForkJoinTask.get(ForkJoinTask.java:995)
[...]
at java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:157)
[...]
"main" [...] waiting on condition [...]
java.lang.Thread.State: WAITING (parking)
[...]
at java.util.concurrent.CountDownLatch.await(CountDownLatch.java:231)
[...]
at java.util.concurrent.ForkJoinTask$AdaptedCallable.exec(ForkJoinTask.java:1445)
at java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:289)
at java.util.concurrent.ForkJoinPool.awaitQuiescence(ForkJoinPool.java:3097)
[...]
“主”线程执行第二个任务并等待闩锁,因此awaitQuiescence
永远不会终止。
在我看来,这是一个错误。基于javadoc,我假设该方法的最大运行时间(“等待的最大时间”)大约是timeout
,但上限实际上更像是所有待处理任务及其所有“后代”的执行时间(可能除了终端)。
另一方面,FJ 池并不完全适用于此类任务(具有非池管理的同步)。从 ForkJoinTask 的javadoc:
理想情况下,计算应避免同步方法或块,并且应尽量减少其他阻塞同步,除了加入其他任务或使用同步器(如被宣传为与 fork/join 调度合作的 Phaser)。
[...]
可以定义和使用可能阻塞的 ForkJoinTasks,但这样做需要进一步考虑三个方面: (1) 完成少数任务(如果有的话)应该依赖于阻塞外部同步或 I/O 的任务。从未加入的事件式异步任务(例如,那些继承 CountedCompleter 的任务)通常属于这一类。(2)为了尽量减少资源影响,任务应该很小;理想情况下只执行(可能)阻塞操作。(3) 除非使用 ForkJoinPool.ManagedBlocker API,或者已知可能阻塞的任务数小于池的 ForkJoinPool.getParallelism() 级别,否则池不能保证有足够的线程可用以确保进度或良好的性能.
考虑使用ThreadPoolExecutor
和/或模拟awaitQuiescence
(例如使用Phaser)。可能实现的草图:
class TaskTrackingExecutorService implements ExecutorService {
private final ExecutorService delegate;
private final Phaser taskTracker = new Phaser();
public TaskTrackingExecutorService(ExecutorService delegate) {
this.delegate = delegate;
}
@Override
public <T> Future<T> submit(Callable<T> task) {
return delegate.submit(() -> {
taskTracker.register();
try {
return task.call();
} finally {
taskTracker.arriveAndDeregister();
}
});
}
@Override
public void execute(Runnable command) {
submit(Executors.callable(command));
}
public boolean awaitQuiescence(long timeout, TimeUnit timeUnit) throws InterruptedException {
taskTracker.register();
try {
taskTracker.awaitAdvanceInterruptibly(taskTracker.arriveAndDeregister(), timeout, timeUnit);
return true;
} catch (TimeoutException e) {
return false;
}
}
@Override
public boolean awaitTermination(long timeout, TimeUnit unit) throws InterruptedException {
return delegate.awaitTermination(timeout, unit);
}
// rest is similar: either use submit method or the delegate.
}
public class Test {
public static void main(String[] args) throws InterruptedException {
TaskTrackingExecutorService pool =
new TaskTrackingExecutorService(Executors.newCachedThreadPool());
CountDownLatch latch = new CountDownLatch(1);
pool.execute(() -> {
System.out.println("Sleeping");
Future<Double> f = pool.submit(() -> {
latch.await();
return 0d;
});
try {
System.out.println(f.get());
} catch (InterruptedException | ExecutionException e) {
e.printStackTrace();
}
System.out.println("Waking up");
}
);
System.out.println(pool.awaitQuiescence(2, TimeUnit.SECONDS));
}
}