15

我需要提交一些任务,然后等待它们,直到所有结果都可用。他们每个人都将 a 添加String到 a Vector(默认情况下是同步的)。然后我需要为 Vector 中的每个结果启动一个新任务,但只有在所有以前的任务都停止工作时我才需要这样做。

我想使用 Java Executor,特别是我尝试使用Executors.newFixedThreadPool(100)以使用固定数量的线程(我有可变数量的任务,可以是 10 或 500),但我是执行器的新手,我不知道如何等待任务终止。这就像我的程序需要做的伪代码:

ExecutorService e = Executors.newFixedThreadPool(100);
while(true){

/*do something*/

for(...){
<start task>
}

<wait for all task termination>

for each String in result{
<start task>
}

<wait for all task termination>
}

我不能做一个 e.shutdown 因为我有一段时间(真的),我需要重用executorService......

你能帮助我吗?你能给我推荐一本关于 java executors 的指南/书吗?

4

5 回答 5

22

ExecutorService为您提供了一种同时执行多个任务并获取对象集合的机制(Future表示任务的异步计算)。

Collection<Callable<?>> tasks = new LinkedList<Callable<?>>();
//populate tasks
for (Future<?> f : executorService.invokeAll(tasks)) { //invokeAll() blocks until ALL tasks submitted to executor complete
    f.get(); 
}

如果您有Runnables 而不是Callables,您可以使用以下方法轻松地将 aRunnable转换为 a Callable<Object>

Callable<?> c = Executors.callable(runnable);
于 2009-08-24T12:48:05.483 回答
14

你能给我推荐一本关于java executors的指南/书吗?

我可以回答这部分:

Brian Goetz(与 Tim Peierls、 Joshua Bloch、Joseph Bowbeer、David Holmes 和Doug Lea合着)的Java Concurrency in Practice很可能是您最好的选择。

不仅仅是关于执行器,而是涵盖了java.util.concurrent包的一般性,以及基本的并发概念和技术,以及一些高级主题,例如 Java 内存模型。

于 2009-08-24T12:52:43.860 回答
14

与其直接提交Runnables 或CallablesExecutor并存储相应的Future返回值,我建议使用CompletionService实现Future 在完成时检索每个。这种方法将任务的生产与已完成任务的消耗分离,例如,允许在一段时间内在生产者线程上发起新任务。

Collection<Callable<Result>> workItems = ...
ExecutorService executor = Executors.newSingleThreadExecutor();
CompletionService<Result> compService = new ExecutorCompletionService<Result>(executor);

// Add work items to Executor.
for (Callable<Result> workItem : workItems) {
  compService.submit(workItem);
}

// Consume results as they complete (this would typically occur on a different thread).
for (int i=0; i<workItems.size(); ++i) {
  Future<Result> fut = compService.take(); // Will block until a result is available.
  Result result = fut.get(); // Extract result; this will not block.
}
于 2009-08-24T13:03:15.330 回答
2

当您提交到执行器服务时,您将获得一个Future对象。

将这些对象存储在一个集合中,然后依次调用每个对象的get()get()阻塞,直到底层作业完成,因此结果是,get()一旦所有底层作业完成,对 each 的调用就会完成。

例如

Collection<Future> futures = ...
for (Future f : futures) {
   Object result = f.get();
   // maybe do something with the result. This could be a
   // genericised Future<T>
}
System.out.println("Tasks completed");

一旦所有这些都完成了,然后开始你的第二次提交。请注意,这可能不是对线程池的最佳使用,因为它将变为休眠状态,然后您将重新填充它。如果可能的话,试着让它忙着做事。

于 2009-08-24T12:44:58.960 回答
1
ExecutorService executor = ...
//submit tasks
executor.shutdown(); // previously submitted tasks are executed, 
                     // but no new tasks will be accepted
while(!executor.awaitTermination(1, TimeUnit.SECONDS))
    ;

如果不创建自定义 ExecutorService,就没有简单的方法来做你想做的事。

于 2010-04-06T13:28:32.367 回答