3

我想并行执行多个可调用对象。但似乎 ExecutorService 总是等到所有可调用对象都完成。

我尝试了以下方法:

final int nThreads = 10;
ExecutorService executorService = Executors.newFixedThreadPool(nThreads);
List<PrimeCallable> tasks = new ArrayList<PrimeCallable>();
for(int i = 0; i < nThreads; i++) {
    tasks.add(new PrimeCallable(0, i * 100 + 100, "thread" + i));
}

try {
    for(Future<List<Integer>> result : executorService.invokeAll(tasks)) {
        List<Integer> integers = result.get();
        for(Integer i : integers){
            System.out.println(i);
        }
    }
} catch (InterruptedException e) {
    // TODO Auto-generated catch block
    e.printStackTrace();
} catch (ExecutionException e) {
    // TODO Auto-generated catch block
    e.printStackTrace();
}

现在,当 executorService 中的所有可调用对象都完成时,将调用 for 循环。据我所知,没有 executorService.isParallel setter ;-)。

让可调用对象并行运行的正确方法是什么?

感谢您的提示!

4

4 回答 4

10

invokeAll的 javadocs说;

执行给定的任务,当全部完成时返回一个持有状态和结果的 Futures 列表。Future.isDone() 对于返回列表的每个元素都是 true。

所以invokeAll阻塞直到集合中的每个任务完成。

于 2011-06-14T15:28:02.930 回答
6

Executor 服务并行运行所有可调用对象。它所做的只是等待所有并行任务完成,然后再继续。所以它不像所有任务都是串行运行的。

于 2011-06-14T14:44:43.370 回答
3

听起来您想要的一部分是延迟执行-您不希望在提取结果之前必须在内存中复制结构。

我会将其视为迭代+转换问题。首先,在您的输入上定义一个迭代器,这样每次调用 next() 都会返回一个 Callable,它将在您的系列中产生下一个值。

转换阶段是对这些 Callables 应用并行或并发评估,如下所示(未测试):

public class ConcurrentTransform
{
  private final ExecutorService executor;
  private final int maxBuffer;

  public ConcurrentTransform(ExecutorService executor, int maxWorkBuffer) {
    this.executor = executor;
    this.maxBuffer = Math.max(1, maxWorkBuffer);
  }

  public <T> Iterator<T> apply(final Iterator<Callable<T>> input) {
    // track submitted work
    final BlockingQueue<Future<T>> submitted = new LinkedBlockingQueue<Future<T>>();

    // submit first N tasks
    for (int i=0; i<maxBuffer && input.hasNext(); i++) {
      Callable<T> task = input.next();
      Future<T> future = executor.submit(task);
      submitted.add(future);
    }

    return new Iterator<T>(){
      @Override
      public synchronized boolean hasNext() {
        return !submitted.isEmpty();
      }
      @Override
      public T next() {
        Future<T> result;
        synchronized (this) {
          result = submitted.poll();
          if (input.hasNext()) {
            submitted.add(executor.submit(input.next()));
          }
        }

        if (result != null) {
          try {
            return result.get(); // blocking
          } catch (Exception e) {
            if (e instanceof RuntimeException) {
               throw (RuntimeException) e;
            } else {
               throw new RuntimeException(e);
            }
          }
        } else {
          throw new NoSuchElementException();
        }
      }
      @Override
      public void remove() {
        throw new UnsupportedOperationException();
      }};
  }
}

在调用 apply(...) 之后,您将遍历结果值,在幕后将并行执行 Callable 对象并以与输入相同的顺序返回结果。一些改进是允许阻塞 result.get() 调用的可选超时,或者在转换本身内管理线程池。

于 2011-09-08T04:56:03.103 回答
2

如果要在结果发生时查看结果,请使用ExecutorCompletionService.

于 2011-06-14T15:49:50.683 回答