我的任务是处理文件目录,如果出现任何问题,需要抛出 IOException。我还需要它跑得更快,所以我将完成的工作分成多个线程并等待它们的终止。它看起来像这样:
//Needs to throw IOException so the rest of the framework handles it properly.
public void process(File directory) throws IOException {
ExecutorService executorService =
new ThreadPoolExecutor(16, 16, Long.MAX_VALUE, TimeUnit.NANOSECONDS,
new LinkedBlockingQueue<Runnable>());
//Convenience class to walk over relevant file types.
Source source = new SourceImpl(directory);
while (source.hasNext()) {
File file = source.next();
executorService.execute(new Worker(file));
}
try {
executorService.shutdown();
executorService.awaitTermination(Long.MAX_VALUE, TimeUnit.NANOSECONDS);
} catch (InterruptedException e) {
executorService.shutdownNow();
throw new IOException("Worker thread had a problem!");
}
}
而Worker线程基本上是:
private class Worker implements Runnable {
private final File file;
public Worker(File file) { this.file = file; }
@Override
public void run() {
try {
//Do work
} catch (IOException e) {
Thread.currentThread().interrupt();
}
}
}
期望的行为是,如果任何 Worker 有一个 IOException,那么生成线程就会意识到它,并且可以反过来抛出它自己的 IOException。这是我能想到的让 Worker 线程发出错误信号的最好方法,但我仍然不确定我是否设置正确。
那么,首先,这会达到我的预期吗?如果 Worker 线程在 run() 中出现错误,调用是否Thread.currentThread().interrupt();
会引发 InterruptedException 以使其被阻塞捕获executorService.awaitTermination(Long.MAX_VALUE, TimeUnit.NANOSECONDS);
?
其次,如果一个正在运行的 Worker 在所有线程都排队之前调用它的中断会发生什么?在阻塞 try/catch 块之前?
最后(也是最重要的),有没有更优雅的方式来实现我的目标?我想让所有无数子线程执行直到完成或直到其中任何一个出现错误,此时我想在生成线程中处理它(通过有效地使整个目录失败)。
解决方案
根据答案,这是我最终使用的实现。它很好地处理了我的异步需求,并在 IOExceptions 上干净且相对快速地失败。
public void process(File directory) throws IOException {
//Set up a thread pool of 16 to do work.
ExecutorService executorService = Executors.newFixedThreadPool(16);
//Arbitrary file source.
Source source = new SourceImpl(directory);
//List to hold references to all worker threads.
ArrayList<Callable<IOException>> filesToWork =
new ArrayList<Callable<IOException>>();
//Service to manage the running of the threads.
ExecutorCompletionService<IOException> ecs =
new ExecutorCompletionService<IOException>(executorService);
//Queue up all of the file worker threads.
while (source.hasNext())
filesToWork.add(new Worker(file));
//Store the potential results of each worker thread.
int n = filesToWork.size();
ArrayList<Future<IOException>> futures =
new ArrayList<Future<IOException>>(n);
//Prepare to return an arbitrary worker's exception.
IOException exception = null;
try {
//Add all workers to the ECS and Future collection.
for (Callable<IOException> callable : filesToWork)
futures.add(ecs.submit(callable));
for (int i = 0; i < n; i++) {
try {
//Get each result as it's available, sometimes blocking.
IOException e = ecs.take().get();
//Stop if an exception is returned.
if (e != null) {
exception = e;
break;
}
//Also catch our own exceptions.
} catch (InterruptedException e) {
exception = new IOException(e);
break;
} catch (ExecutionException e) {
exception = new IOException(e);
break;
}
}
} finally {
//Stop any pending tasks if we broke early.
for (Future<IOException> f : futures)
f.cancel(true);
//And kill all of the threads.
executorService.shutdownNow();
}
//If anything went wrong, it was preserved. Throw it now.
if (exception != null)
throw exception;
}
和
//Does work, and returns (not throws) an IOException object on error.
private class Worker implements Callable<IOException> {
private final File file;
public Worker(File file) { this.file = file; }
@Override
public IOException call() {
try {
//Do work
} catch (IOException e) {
return e;
}
return null;
}
}