1

我正在使用 ExecutorCompletionService 提交一些任务。然后我想等待最大值,比如 5 秒,然后停止处理。

ExecutorService executorService = Executors.newFixedThreadPool(10);     
CompletionService<String> completionService = new ExecutorCompletionService<String>(
            executorService);
List<Callable<String>> callables = createCallables(); //each callable sleeps randomly between 1-10 seconds and then prints the thread name
for (Callable<String> callable : callables) 
    taskCompletionService.submit(callable);
for (int i = 0; i < callables.size(); i++) {
    Future<String> result = completionService.take();   
    System.out.println(result.get()); 
}

现在我不想等待超过 5 秒才能完成所有任务。我只想收集在 5 秒内完成的任务的结果。我怎样才能做到这一点?

executorService.shutdown();
executorService.awaitTermination(5, TimeUnit.SECONDS);

我已经使用shutdownand awaitTerminationon executorService,但我的主线程仍然等待所有提交的任务完成,所有任务完成需要 10 秒并打印每个线程的名称。如何在 5 秒内停止处理?

4

1 回答 1

0

正如您提到的,这里的主要问题是您的代码正在等待任务完成shutdown()才能被调用。本质上这是因为CompletionService.take()将阻塞直到任务完成。此外,您需要跟踪获得任务结果所花费的累计时间,因为CompletionService它不会为您完成。

这个想法是改为使用poll(long, TimeUnit)并将null结果解释为超时到期,之后您可以立即关闭执行程序服务。例如,可以这样做:

try {
  ExecutorService executorService = Executors.newFixedThreadPool(10);     
  CompletionService<String> completionService = new ExecutorCompletionService<>(executorService);
  // each callable sleeps randomly between 1-10 seconds and then prints the thread name
  List<Callable<String>> callables = createCallables();
  for (Callable<String> callable : callables) {
    completionService.submit(callable);
  }
  final long timeout = 5_000_000_000L; // 5 seconds in nanos
  long elapsed = 0L;
  int count = 0;
  final long start = System.nanoTime();
  // while not timed out and not all tasks have completed
  while (((elapsed = System.nanoTime() - start) < timeout) && (count < callables.size())) {
    // wait for at most the remaining time before timeout
    Future<String> result = completionService.poll(timeout - elapsed, TimeUnit.NANOSECONDS);
    if (result == null) {
      System.out.println("timed out after " + count + " tasks and " + ((System.nanoTime() - start)/1_000_000L) + " ms");
      break;
    }
    count++;
    System.out.println(result.get()); 
  }
  executorService.shutdownNow();
  System.out.println("done");
} catch (Exception e) {
  e.printStackTrace();
}

我能够测试它是否可以createCallables()像这样实现:

private static List<Callable<String>> createCallables() {
  Random rand = new Random(System.nanoTime());
  List<Callable<String>> list = new ArrayList<>();
  for (int i=0; i<10; i++) {
    // between 1 and 10s
    final long time = 1000L * (1L + rand.nextInt(10));
    list.add(new Callable<String>() {
      @Override
      public String call() throws Exception {
        Thread.sleep(time);
        return "ok after " + time + "s on thread " + Thread.currentThread();
      }
    });
  }
  return list;
}
于 2015-09-27T07:29:32.057 回答