5

我正在寻找一种在 java 中执行批量任务的方法。这个想法是有一个ExecutorService基于线程池的线程池,它允许我在一个线程Callable的不同线程之间传播一组main。此类应提供一个waitForCompletion方法,该方法将使main线程进入睡眠状态,直到所有任务都执行完毕。然后main线程应该被唤醒,它会执行一些操作并重新提交一组任务。

这个过程会重复很多次,所以我想使用ExecutorService.shutdown它,因为这需要创建多个ExecutorService.

AtomicInteger目前我已经使用 a和 a Lock/以下列方式实现了它Condition

public class BatchThreadPoolExecutor extends ThreadPoolExecutor {
  private final AtomicInteger mActiveCount;
  private final Lock          mLock;
  private final Condition     mCondition;

  public <C extends Callable<V>, V> Map<C, Future<V>> submitBatch(Collection<C> batch){
    ...
    for(C task : batch){
      submit(task);
      mActiveCount.incrementAndGet();
    }
  }

  @Override
  protected void afterExecute(Runnable r, Throwable t) {
    super.afterExecute(r, t);
    mLock.lock();
    if (mActiveCount.decrementAndGet() == 0) {
      mCondition.signalAll();
    }
    mLock.unlock();
  }

  public void awaitBatchCompletion() throws InterruptedException {
    ...
    // Lock and wait until there is no active task
    mLock.lock();
    while (mActiveCount.get() > 0) {
      try {
        mCondition.await();
      } catch (InterruptedException e) {
        mLock.unlock();
        throw e;
      }
    }
    mLock.unlock();
  } 
}

请注意,我不一定会一次提交批次中的所有任务,因此CountDownLatch似乎不是一个选择。

这是一种有效的方法吗?有没有更有效/优雅的方式来实现它?

谢谢

4

3 回答 3

8

我认为ExecutorService本身将能够执行您的要求。

调用invokeAll([...])并遍历您的所有任务。如果您可以遍历所有期货,则所有任务都已完成。

于 2012-04-24T12:59:22.497 回答
3

正如其他答案所指出的那样,您的用例中似乎没有任何部分需要自定义 ExecutorService。

在我看来,您需要做的就是提交一个批处理,等待它们全部完成,同时忽略主线程上的中断,然后可能根据第一批的结果提交另一个批处理。我相信这只是一个问题:

    ExecutorService service = ...;

    Collection<Future> futures = new HashSet<Future>();
    for (Callable callable : tasks) {
        Future future = service.submit(callable);
        futures.add(future);
    }

    for(Future future : futures) {
        try {
            future.get();
        } catch (InterruptedException e) {
            // Figure out if the interruption means we should stop.
        }
    }

    // Use the results of futures to figure out a new batch of tasks.
    // Repeat the process with the same ExecutorService.
于 2012-04-24T13:32:49.933 回答
0

我同意@ckuetbach 的观点,默认 JavaExecutors应该为您提供执行“批处理”作业所需的所有功能。

如果我是你,我只会提交一堆工作,等待他们完成,ExecutorService.awaitTermination()然后开始一个新的ExecutorService. 这样做以节省“线程创建”是过早的优化,除非您每秒执行 100 次或其他操作。

如果您真的坚持对ExecutorService每个批次都使用相同的,那么您可以自己分配一个ThreadPoolExecutor,然后循环查看ThreadPoolExecutor.getActiveCount(). 就像是:

BlockingQueue jobQueue = new LinkedBlockingQueue<Runnable>();
ThreadPoolExecutor executor = new ThreadPoolExecutor(NUM_THREADS, NUM_THREADS,
    0L, TimeUnit.MILLISECONDS, jobQueue);
// submit your batch of jobs ...
// need to wait a bit for the jobs to start
Thread.sleep(100);
while (executor.getActiveCount() > 0 && jobQueue.size() > 0) {
    // to slow the spin
    Thread.sleep(1000);
}
// continue on to submit the next batch
于 2012-04-24T13:10:08.833 回答