1

我正在尝试使用 ForkJoinPool 中的 awaitQuiescence 方法等待所有提交的任务完成,或者如果超时后任务尚未完成,则返回 false。

实际上所有提交的任务都可以向池中添加额外的任务,所以我不能使用 awaitTermination 方法,因为这会阻止提交这些额外的任务。但是,awaitQuiescence 不会返回任何内容,即使指定的时间已经结束。

我试图在下面的代码中明确这个问题。CountDownLatch.await 永远不会被触发,但是为什么 awaitQuiescence 方法不返回 false 呢?

public static void main(String[] args) {
    final ForkJoinPool test = new ForkJoinPool(1,
            ForkJoinPool.defaultForkJoinWorkerThreadFactory, null,true);
    final CountDownLatch latch = new CountDownLatch(1);

    test.execute(() -> { 
            try {
                System.out.println("Sleeping");
                Future<Double> f = test.submit(() -> { 
                        latch.await(); 
                        return 0d; 
                    });
                System.out.println(f.get());
                System.out.println("Waking up");
    } catch (InterruptedException | ExecutionException e) {
                e.printStackTrace();
            });

    System.out.println(test.awaitQuiescence(1, TimeUnit.SECONDS));
} 

非常感谢!

4

1 回答 1

1

为什么 awaitQuiescence 方法不返回 false?

似乎在有待处理任务时awaitQuiescence忽略timeout并在调用者的线程中执行任务(参见源代码)。

线程转储:

"ForkJoinPool-1-worker-1" [...] Object.wait() [...]
   java.lang.Thread.State: WAITING (on object monitor)
  [...]
  at java.util.concurrent.ForkJoinTask.get(ForkJoinTask.java:995)
  [...]
  at java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:157)

[...]

"main" [...] waiting on condition [...]
   java.lang.Thread.State: WAITING (parking)
  [...]
  at java.util.concurrent.CountDownLatch.await(CountDownLatch.java:231)
  [...]
  at java.util.concurrent.ForkJoinTask$AdaptedCallable.exec(ForkJoinTask.java:1445)
  at java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:289)
  at java.util.concurrent.ForkJoinPool.awaitQuiescence(ForkJoinPool.java:3097)
  [...]

“主”线程执行第二个任务并等待闩锁,因此awaitQuiescence永远不会终止。

在我看来,这是一个错误。基于javadoc,我假设该方法的最大运行时间(“等待的最大时间”)大约是timeout,但上限实际上更像是所有待处理任务及其所有“后代”的执行时间(可能除了终端)。

另一方面,FJ 池并不完全适用于此类任务(具有非池管理的同步)。从 ForkJoinTask 的javadoc

理想情况下,计算应避免同步方法或块,并且应尽量减少其他阻塞同步,除了加入其他任务或使用同步器(如被宣传为与 fork/join 调度合作的 Phaser)。

[...]

可以定义和使用可能阻塞的 ForkJoinTasks,但这样做需要进一步考虑三个方面: (1) 完成少数任务(如果有的话)应该依赖于阻塞外部同步或 I/O 的任务。从未加入的事件式异步任务(例如,那些继承 CountedCompleter 的任务)通常属于这一类。(2)为了尽量减少资源影响,任务应该很小;理想情况下只执行(可能)阻塞操作。(3) 除非使用 ForkJoinPool.ManagedBlocker API,或者已知可能阻塞的任务数小于池的 ForkJoinPool.getParallelism() 级别,否则池不能保证有足够的线程可用以确保进度或良好的性能.

考虑使用ThreadPoolExecutor和/或模拟awaitQuiescence(例如使用Phaser)。可能实现的草图:

class TaskTrackingExecutorService implements ExecutorService {

  private final ExecutorService delegate;
  private final Phaser taskTracker = new Phaser();

  public TaskTrackingExecutorService(ExecutorService delegate) {
    this.delegate = delegate;
  }

  @Override
  public <T> Future<T> submit(Callable<T> task) {
    return delegate.submit(() -> {
      taskTracker.register();
      try {
        return task.call();
      } finally {
        taskTracker.arriveAndDeregister();
      }
    });
  }

  @Override
  public void execute(Runnable command) {
    submit(Executors.callable(command));
  }

  public boolean awaitQuiescence(long timeout, TimeUnit timeUnit) throws InterruptedException {
    taskTracker.register();
    try {
      taskTracker.awaitAdvanceInterruptibly(taskTracker.arriveAndDeregister(), timeout, timeUnit);
      return true;
    } catch (TimeoutException e) {
      return false;
    }
  }

  @Override
  public boolean awaitTermination(long timeout, TimeUnit unit) throws InterruptedException {
    return delegate.awaitTermination(timeout, unit);
  }

  // rest is similar: either use submit method or the delegate.

}


public class Test {
  public static void main(String[] args) throws InterruptedException {
    TaskTrackingExecutorService pool =
        new TaskTrackingExecutorService(Executors.newCachedThreadPool());
    CountDownLatch latch = new CountDownLatch(1);

    pool.execute(() -> {
          System.out.println("Sleeping");
          Future<Double> f = pool.submit(() -> {
            latch.await();
            return 0d;
          });
          try {
            System.out.println(f.get());
          } catch (InterruptedException | ExecutionException e) {
            e.printStackTrace();
          }
          System.out.println("Waking up");
        }
    );

    System.out.println(pool.awaitQuiescence(2, TimeUnit.SECONDS));
  }
}
于 2016-12-10T16:53:17.430 回答