34

我使用 anExecutorService来执行任务。该任务可以递归地创建提交给相同的其他任务,ExecutorService并且这些子任务也可以这样做。

我现在有一个问题,我想等到所有任务都完成(即所有任务都完成并且他们没有提交新任务)后再继续。

我无法调用ExecutorService.shutdown()主线程,因为这会阻止新任务被ExecutorService.

如果没有被调用,调用ExecutorService.awaitTermination()似乎什么都不做。shutdown

所以我有点卡在这里。ExecutorService看到所有工人都处于闲置状态并不是那么难,不是吗?我能想出的唯一不优雅的解决方案是直接使用 a并每隔一段时间ThreadPoolExecutor查询一次。getPoolSize()真的没有更好的方法吗?

4

11 回答 11

19

这确实是 Phaser 的理想人选。Java 7 即将推出这个新类。它是一个灵活的 CountdonwLatch/CyclicBarrier。您可以在JSR 166 Interest Site获得稳定版本。

它是更灵活的 CountdownLatch/CyclicBarrier 的方式是因为它不仅能够支持未知数量的方(线程),而且还可以重用(这就是阶段部分的来源)

对于您提交的每项任务,您都会注册,当该任务完成时,您就会到达。这可以递归地完成。

Phaser phaser = new Phaser();
ExecutorService e = //

Runnable recursiveRunnable = new Runnable(){
   public void run(){
      //do work recursively if you have to

      if(shouldBeRecursive){
           phaser.register();
           e.submit(recursiveRunnable);
      }

      phaser.arrive();
   }
}

public void doWork(){
   int phase = phaser.getPhase();

   phaser.register();
   e.submit(recursiveRunnable);

   phaser.awaitAdvance(phase);
}

编辑:感谢@depthofreality 在我之前的示例中指出竞争条件。我正在更新它,以便执行线程只等待当前阶段的推进,因为它阻塞递归函数完成。

arrive相数直到s == s的数量才会跳闸register。由于在每个递归调用调用之前,register当所有调用完成时都会发生阶段增量。

于 2011-02-10T14:43:07.890 回答
17

如果递归任务树中的任务数量最初是未知的,也许最简单的方法是实现您自己的同步原语,某种“反向信号量”,并在您的任务之间共享它。在提交每个任务之前,您会增加一个值,当任务完成时,它会减少该值,然后等待该值变为 0。

将其实现为从任务显式调用的单独原语可以将此逻辑与线程池实现分离,并允许您将多个独立的递归任务树提交到同一个池中。

像这样的东西:

public class InverseSemaphore {
    private int value = 0;
    private Object lock = new Object();

    public void beforeSubmit() {
        synchronized(lock) {
            value++;
        }
    }

    public void taskCompleted() {
        synchronized(lock) {
            value--;
            if (value == 0) lock.notifyAll();
        }
    }

    public void awaitCompletion() throws InterruptedException {
        synchronized(lock) {
            while (value > 0) lock.wait();
        }
    }
}

请注意,taskCompleted()应该在finally块内调用,以使其免受可能的异常的影响。

另请注意,beforeSubmit()应在提交任务之前由提交线程调用,而不是由任务本身调用,以避免在旧任务完成而新任务尚未启动时可能出现的“错误完成”。

编辑:修复了使用模式的重要问题。

于 2011-02-10T14:37:34.807 回答
7

哇,你们好快:)

感谢您的所有建议。Futures 不容易与我的模型集成,因为我不知道事先安排了多少可运行对象。因此,如果我保持父任务活着只是为了等待它的递归子任务完成,我周围会有很多垃圾。

我使用 AtomicInteger 建议解决了我的问题。本质上,我将 ThreadPoolExecutor 子类化,并在调用 execute() 时递增计数器,并在调用 afterExecute() 时递减计数器。当计数器变为 0 时,我调用 shutdown()。这似乎对我的问题有用,不确定这是否是一种普遍的好方法。特别是,我假设您只使用 execute() 来添加 Runnables。

作为一个侧节点:我首先尝试在 afterExecute() 中检查队列中的 Runnable 数量以及当它们为 0 时处于活动状态和关闭的工作人员数量;但这不起作用,因为并非所有 Runnables 都出现在队列中,并且 getActiveCount() 也没有达到我的预期。

无论如何,这是我的解决方案:(如果有人对此有严重问题,请告诉我:)

public class MyThreadPoolExecutor extends ThreadPoolExecutor {

    private final AtomicInteger executing = new AtomicInteger(0);

    public MyThreadPoolExecutor(int coorPoolSize, int maxPoolSize, long keepAliveTime,
        TimeUnit seconds, BlockingQueue<Runnable> queue) {
        super(coorPoolSize, maxPoolSize, keepAliveTime, seconds, queue);
    }


    @Override
    public void execute(Runnable command) {
        //intercepting beforeExecute is too late!
        //execute() is called in the parent thread before it terminates
        executing.incrementAndGet();
        super.execute(command);
    }


    @Override
    protected void afterExecute(Runnable r, Throwable t) {
        super.afterExecute(r, t);
        int count = executing.decrementAndGet();
        if(count == 0) {
            this.shutdown();
        }
    }

}
于 2011-02-10T15:16:00.707 回答
3

您可以创建自己的线程池来扩展ThreadPoolExecutor。您想知道任务何时提交以及何时完成。

public class MyThreadPoolExecutor extends ThreadPoolExecutor {
    private int counter = 0;

    public MyThreadPoolExecutor() {
        super(1, 1, 0, TimeUnit.SECONDS, new LinkedBlockingQueue<Runnable>());
    }

    @Override
    public synchronized void execute(Runnable command) {
        counter++;
        super.execute(command);
    }

    @Override
    protected synchronized void afterExecute(Runnable r, Throwable t) {
        super.afterExecute(r, t);
        counter--;
        notifyAll();
    }

    public synchronized void waitForExecuted() throws InterruptedException {
        while (counter == 0)
            wait();
    }
}
于 2011-05-01T10:27:07.190 回答
1

为您的任务使用Future(而不是提交Runnable),回调在完成时更新其状态,因此您可以使用Future.isDone跟踪所有任务的状态。

于 2011-02-10T14:35:08.927 回答
0

使用CountDownLatch。将 CountDownLatch 对象传递给您的每个任务,并对您的任务进行如下编码。

public void doTask() {
    // do your task
    latch.countDown(); 
}

而需要等待的线程应该执行以下代码:

public void doWait() {
    latch.await();
}

但是,当然,这假设您已经知道子任务的数量,以便您可以初始化锁存器的计数。

于 2011-02-10T14:33:53.690 回答
0

(mea culpa:它有点过了我的就寝时间;)但这是动态闩锁的第一次尝试):

package oss.alphazero.sto4958330;

import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.AbstractQueuedSynchronizer;

public class DynamicCountDownLatch {
    @SuppressWarnings("serial")
    private static final class Sync extends AbstractQueuedSynchronizer {
        private final CountDownLatch toplatch;
        public Sync() {
            setState(0);
            this.toplatch = new CountDownLatch(1);
        }

        @Override
        protected int tryAcquireShared(int acquires){
            try {
                toplatch.await();
            } 
            catch (InterruptedException e) {
                throw new RuntimeException("Interrupted", e);
            }
            return getState() == 0 ? 1 : -1;
        }
        public boolean tryReleaseShared(int releases) {
            for (;;) {
                int c = getState();
                if (c == 0)
                    return false;
                int nextc = c-1;
                if (compareAndSetState(c, nextc)) 
                    return nextc == 0;
            }
        }
        public boolean tryExtendState(int acquires) {
            for (;;) {
                int s = getState();
                int exts = s+1;
                if (compareAndSetState(s, exts)) {
                    toplatch.countDown();
                    return exts > 0;
                }
            }
        }
    }
    private final Sync sync;
    public DynamicCountDownLatch(){
        this.sync = new Sync();
    }
    public void await() 
        throws InterruptedException   
    {
        sync.acquireSharedInterruptibly(1);
    }

    public boolean await(long timeout, TimeUnit   unit) 
        throws InterruptedException   
    {
        return sync.tryAcquireSharedNanos(1, unit.toNanos(timeout));
    }
    public void countDown() {
        sync.releaseShared(1);
    }
    public void join() {
        sync.tryExtendState(1);
    }
}

这个锁存器为现有的(克隆的)CountDownLatch API 引入了一个新方法 join(),任务使用它来表示它们进入更大的任务组。

闩锁从父任务传递到子任务。根据 Suraj 的模式,每个任务将首先“加入()”锁存器,执行其任务(),然后执行 countDown()。

为了解决主线程启动任务组然后立即 awaits() 的情况——在任何任务线程有机会甚至 join() 之前——topLatch使用 int 内部Sync类。这是一个锁存器,每次 join() 都会被倒计时;当然,只有第一个倒计时很重要,因为所有后续倒计时都是 nops。

上面的初始实现确实引入了某种语义皱纹,因为 tryAcquiredShared(int) 不应该抛出 InterruptedException,但我们确实需要处理 topLatch 上的等待中断。

这是对 OP 自己使用 Atomic 计数器的解决方案的改进吗?我会说可能不是 IFF,他坚持使用 Executors,但我相信,在这种情况下,它是使用 AQS 的同样有效的替代方法,并且也可用于通用线程。

击退其他黑客。

于 2011-05-01T07:19:18.863 回答
0

如果您想使用 JSR166y 类 - 例如 Phaser 或 Fork/Join - 其中任何一个都可能适合您,您可以随时从以下网址下载它们的 Java 6 反向移植:http: //gee.cs.oswego.edu/dl/concurrency -interest/并以此为基础,而不是编写一个完全自制的解决方案。然后当 7 出现时,您可以删除对 backport 的依赖并更改一些包名称。

(完全披露:我们已经在 prod 中使用 LinkedTransferQueue 一段时间了。没有问题)

于 2011-05-03T09:39:33.197 回答
0

我必须说,上面描述的递归调用任务和等待结束子任务问题的解决方案并不能让我满意。我的解决方案灵感来自 Oracle 的原始文档:CountDownLatch和示例:人力资源 CountDownLatch

HRManagerCompact 类实例中的第一个公共线程有两个子线程的等待锁存器,其后续的两个子线程有等待锁存器......等等。

当然,latch 可以设置为不同于 2 的值(在 CountDownLatch 的构造函数中),也可以在迭代中建立可运行对象的数量,即 ArrayList,但必须对应(倒计时的数量必须等于参数在 CountDownLatch 构造函数中)。

请注意,锁存器的数量根据限制条件呈指数增长:'level.get() < 2',以及对象的数量。1, 2, 4, 8, 16... 和锁存器 0, 1, 2, 4... 如您所见,对于四个级别(level.get() < 4),将有 15 个等待线程和 7 个锁存器在那个时候,峰值 16 个线程正在运行。

package processes.countdownlatch.hr;

import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;

/** Recursively latching running classes to wait for the peak threads
 *
 * @author hariprasad
 */
public class HRManagerCompact extends Thread {
  final int N = 2; // number of daughter's tasks for latch
  CountDownLatch countDownLatch;
  CountDownLatch originCountDownLatch;
  AtomicInteger level = new AtomicInteger(0);
  AtomicLong order = new AtomicLong(0); // id latched thread waiting for

  HRManagerCompact techLead1 = null;
  HRManagerCompact techLead2 = null;
  HRManagerCompact techLead3 = null;

// constructor
public HRManagerCompact(CountDownLatch countDownLatch, String name,
    AtomicInteger level, AtomicLong order){
  super(name);
  this.originCountDownLatch=countDownLatch;
  this.level = level;
  this.order = order;
 }

 private void doIt() {
    countDownLatch = new CountDownLatch(N);
    AtomicInteger leveli = new AtomicInteger(level.get() + 1);
    AtomicLong orderi = new AtomicLong(Thread.currentThread().getId());
    techLead1 = new HRManagerCompact(countDownLatch, "first", leveli, orderi);
    techLead2 = new HRManagerCompact(countDownLatch, "second", leveli, orderi);
    //techLead3 = new HRManagerCompact(countDownLatch, "third", leveli);

    techLead1.start();
    techLead2.start();
    //techLead3.start();

    try {
     synchronized (Thread.currentThread()) { // to prevent print and latch in the same thread
       System.out.println("*** HR Manager waiting for recruitment to complete... " + level + ", " + order + ", " + orderi);
       countDownLatch.await(); // wait actual thread
     }
     System.out.println("*** Distribute Offer Letter, it means finished. " + level + ", " + order + ", " + orderi);
    } catch (InterruptedException e) {
     e.printStackTrace();
    }
  }

 @Override
 public void run() {
  try {
   System.out.println(Thread.currentThread().getName() + ": working... " + level + ", " + order + ", " + Thread.currentThread().getId());
   Thread.sleep(10*level.intValue());
   if (level.get() < 2) doIt();
   Thread.yield();
  }
  catch (Exception e) {
   // TODO Auto-generated catch block
   e.printStackTrace();
  }
  /*catch (InterruptedException e) {
   // TODO Auto-generated catch block
   e.printStackTrace();
  }*/
  // TODO Auto-generated method stub
  System.out.println("--- " +Thread.currentThread().getName() + ": recruted " + level + ", " + order + ", " + Thread.currentThread().getId());
  originCountDownLatch.countDown(); // count down
 }

 public static void main(String args[]){
  AtomicInteger levelzero = new AtomicInteger(0);
  HRManagerCompact hr = new HRManagerCompact(null, "zero", levelzero, new AtomicLong(levelzero.longValue()));
  hr.doIt();
 }
}

可能的注释输出(有一定的可能性):

first: working... 1, 1, 10 // thread 1, first daughter's task (10)
second: working... 1, 1, 11 // thread 1, second daughter's task (11)
first: working... 2, 10, 12 // thread 10, first daughter's task (12)
first: working... 2, 11, 14 // thread 11, first daughter's task (14)
second: working... 2, 11, 15 // thread 11, second daughter's task (15)
second: working... 2, 10, 13 // thread 10, second daughter's task (13)
--- first: recruted 2, 10, 12 // finished 12
--- first: recruted 2, 11, 14 // finished 14
--- second: recruted 2, 10, 13  // finished 13 (now can be opened latch 10)
--- second: recruted 2, 11, 15  // finished 15 (now can be opened latch 11)
*** HR Manager waiting for recruitment to complete... 0, 0, 1
*** HR Manager waiting for recruitment to complete... 1, 1, 10
*** Distribute Offer Letter, it means finished. 1, 1, 10 // latch on 10 opened
--- first: recruted 1, 1, 10 // finished 10
*** HR Manager waiting for recruitment to complete... 1, 1, 11
*** Distribute Offer Letter, it means finished. 1, 1, 11 // latch on 11 opened
--- second: recruted 1, 1, 11  // finished 11 (now can be opened latch 1)
*** Distribute Offer Letter, it means finished. 0, 0, 1  // latch on 1 opened
于 2014-12-26T16:24:14.847 回答
0

我能想出的唯一不优雅的解决方案是直接使用 ThreadPoolExecutor 并每隔一段时间查询它的 getPoolSize() 。真的没有更好的方法吗?

您必须以正确的顺序使用shutdown() ,awaitTermination()and shutdownNow()方法。

shutdown():启动有序关闭,其中执行先前提交的任务,但不会接受新任务。

awaitTermination(): 阻塞,直到所有任务在关闭请求后完成执行,或发生超时,或当前线程被中断,以先发生者为准。

shutdownNow():尝试停止所有正在执行的任务,停止等待任务的处理,并返回等待执行的任务列表。

来自ExecutorService的 oracle 文档页面的推荐方式:

 void shutdownAndAwaitTermination(ExecutorService pool) {
   pool.shutdown(); // Disable new tasks from being submitted
   try {
     // Wait a while for existing tasks to terminate
     if (!pool.awaitTermination(60, TimeUnit.SECONDS)) {
       pool.shutdownNow(); // Cancel currently executing tasks
       // Wait a while for tasks to respond to being cancelled
       if (!pool.awaitTermination(60, TimeUnit.SECONDS))
           System.err.println("Pool did not terminate");
     }
   } catch (InterruptedException ie) {
     // (Re-)Cancel if current thread also interrupted
     pool.shutdownNow();
     // Preserve interrupt status
     Thread.currentThread().interrupt();
   }

如果完成任务的持续时间较长,您可以将 if 条件替换为 while 条件,如下所示:

改变

if (!pool.awaitTermination(60, TimeUnit.SECONDS))

 while(!pool.awaitTermination(60, TimeUnit.SECONDS)) {
     Thread.sleep(60000);
 }  

您可以参考其他替代方案(除了join()可以与独立线程一起使用的):

等到所有线程在java中完成它们的工作

于 2016-09-02T15:05:32.477 回答
0

您可以使用跟踪正在运行的线程的运行器:

Runner runner = Runner.runner(numberOfThreads);

runner.runIn(2, SECONDS, callable);
runner.run(callable);


// blocks until all tasks are finished (or failed)
runner.waitTillDone();


// and reuse it
runner.runRunnableIn(500, MILLISECONDS, runnable);


runner.waitTillDone();


// and then just kill it
runner.shutdownAndAwaitTermination();

要使用它,您只需添加一个依赖项:

编译'com.github.matejtymes:javafixes:1.3.0'

于 2019-06-20T23:27:00.787 回答