45

如果我们使用ExecutorCompletionService,我们可以将一系列任务作为 s 提交,并获得与asCallable交互的结果。 CompletionServicequeue

但也有invokeAll接受ExecutorService任务Collection的 ,我们得到一个列表Future来检索结果。

据我所知,使用其中一个或另一个没有任何好处(除了我们避免for使用 an 的循环invokeAll,我们必须submit将任务分配给CompletionService),并且它们本质上是相同的想法,但略有不同。

那么为什么有两种不同的方式来提交一系列任务呢?我是否纠正了它们在性能方面是等效的?有没有一种情况比另一种更合适?我想不出一个。

4

4 回答 4

87

使用 a ExecutorCompletionService.poll/take,您Future将按完成顺序(或多或少)收到 s 。使用ExecutorService.invokeAll,你没有这个权力;您要么阻止直到全部完成,要么指定一个超时,在此之后取消不完整的。


static class SleepingCallable implements Callable<String> {

  final String name;
  final long period;

  SleepingCallable(final String name, final long period) {
    this.name = name;
    this.period = period;
  }

  public String call() {
    try {
      Thread.sleep(period);
    } catch (InterruptedException ex) { }
    return name;
  }
}

现在,下面我将演示如何invokeAll工作:

final ExecutorService pool = Executors.newFixedThreadPool(2);
final List<? extends Callable<String>> callables = Arrays.asList(
    new SleepingCallable("quick", 500),
    new SleepingCallable("slow", 5000));
try {
  for (final Future<String> future : pool.invokeAll(callables)) {
    System.out.println(future.get());
  }
} catch (ExecutionException | InterruptedException ex) { }
pool.shutdown();

这会产生以下输出:

C:\dev\scrap>java CompletionExample
... after 5 s ...
quick
slow

使用CompletionService,我们看到不同的输出:

final ExecutorService pool = Executors.newFixedThreadPool(2);
final CompletionService<String> service = new ExecutorCompletionService<String>(pool);
final List<? extends Callable<String>> callables = Arrays.asList(
    new SleepingCallable("slow", 5000),
    new SleepingCallable("quick", 500));
for (final Callable<String> callable : callables) {
  service.submit(callable);
}
pool.shutdown();
try {
  while (!pool.isTerminated()) {
    final Future<String> future = service.take();
    System.out.println(future.get());
  }
} catch (ExecutionException | InterruptedException ex) { }

这会产生以下输出:

C:\dev\scrap>java CompletionExample
... after 500 ms ...
quick
... after 5 s ...
slow

请注意,时间是相对于程序开始的,而不是前一条消息。


您可以在此处找到两者的完整代码。

于 2012-08-08T20:26:26.600 回答
19

那么为什么有两种不同的方式来提交一系列任务呢?我是否纠正了它们在性能方面是等效的?有没有一种情况比另一种更合适?我想不出一个。

通过使用ExecutorCompletionService,您可以在每个作业完成时立即收到通知。相比之下,在返回 s 集合之前ExecutorService.invokeAll(...)等待所有Future作业完成。这意味着(例如),如果除了一项作业之外的所有作业都在 10 分钟内完成,而一项作业需要 30 分钟,那么您将在 30 分钟内没有任何结果。

// this waits until _all_ of the jobs complete
List<Future<Object>> futures = threadPool.invokeAll(...);

相反,当您使用 a 时ExecutorCompletionService,您将能够在每个作业完成后立即获得作业,这允许您(例如)将它们发送到另一个线程池进行处理,立即记录结果等。

ExecutorService threadPool = Executors.newFixedThreadPool(2);
ExecutorCompletionService<Result> compService
      = new ExecutorCompletionService<Result>(threadPool);
for (MyJob job : jobs) {
    compService.submit(job);
}
// shutdown the pool but the jobs submitted continue to run
threadPool.shutdown();
while (true) {
    Future<Result> future;
    // if pool has terminated (all jobs finished after shutdown) then poll() else take()
    if (threadPool.isTerminated()) {
        future = compService.poll();
        if (future == null) {
            break;
        }
    } else {
        // the take() blocks until any of the jobs complete
        // this joins with the jobs in the order they _finish_
        future = compService.take();
    }
    // this get() won't block
    Result result = future.get();
    // you can then put the result in some other thread pool or something
    // to immediately start processing it
    someOtherThreadPool.submit(new SomeNewJob(result));
}
于 2012-08-08T21:00:39.697 回答
3

我实际上从未使用过 ExecutorCompletionService,但我认为这可能比“正常”ExecutorService 更有用的情况是,当您希望按完成顺序接收已完成任务的 Futures 时。使用 invokeAll,您只需获得一个列表,该列表可以在任何给定时间包含未完成和已完成的任务。

于 2012-08-08T20:27:01.563 回答
1

仅考虑结果的顺序进行比较:

当我们使用CompletionService提交的作业完成时,结果将被推送到队列(完成顺序)。那么提交作业的顺序和返回的结果就不再一样了。因此,如果您担心执行任务的顺序,请使用CompletionService

其中 AsinvokeAll返回代表任务的 Futures 列表,其顺序与迭代器为给定任务列表生成的顺序相同,每个任务都已完成。

于 2013-01-14T19:49:10.057 回答