我的任务是处理文件目录,如果出现任何问题,需要抛出 IOException。我还需要它跑得更快,所以我将完成的工作分成多个线程并等待它们的终止。它看起来像这样:
//Needs to throw IOException so the rest of the framework handles it properly.
public void process(File directory) throws IOException {
ExecutorService executorService =
new ThreadPoolExecutor(16, 16, Long.MAX_VALUE, TimeUnit.NANOSECONDS,
new LinkedBlockingQueue<Runnable>());
//Convenience class to walk over relevant file types.
Source source = new SourceImpl(directory);
while (source.hasNext()) {
File file = source.next();
executorService.execute(new Worker(file));
try {
executorService.awaitTermination(Long.MAX_VALUE, TimeUnit.NANOSECONDS);
} catch (InterruptedException e) {
throw new IOException("Worker thread had a problem!");
private class Worker implements Runnable {
private final File file;
public Worker(File file) { this.file = file; }
public void run() {
try {
//Do work
} catch (IOException e) {
期望的行为是,如果任何 Worker 有一个 IOException,那么生成线程就会意识到它,并且可以反过来抛出它自己的 IOException。这是我能想到的让 Worker 线程发出错误信号的最好方法,但我仍然不确定我是否设置正确。
那么,首先,这会达到我的预期吗?如果 Worker 线程在 run() 中出现错误,调用是否Thread.currentThread().interrupt();
会引发 InterruptedException 以使其被阻塞捕获executorService.awaitTermination(Long.MAX_VALUE, TimeUnit.NANOSECONDS);
其次,如果一个正在运行的 Worker 在所有线程都排队之前调用它的中断会发生什么?在阻塞 try/catch 块之前?
根据答案,这是我最终使用的实现。它很好地处理了我的异步需求,并在 IOExceptions 上干净且相对快速地失败。
public void process(File directory) throws IOException {
//Set up a thread pool of 16 to do work.
ExecutorService executorService = Executors.newFixedThreadPool(16);
//Arbitrary file source.
Source source = new SourceImpl(directory);
//List to hold references to all worker threads.
ArrayList<Callable<IOException>> filesToWork =
new ArrayList<Callable<IOException>>();
//Service to manage the running of the threads.
ExecutorCompletionService<IOException> ecs =
new ExecutorCompletionService<IOException>(executorService);
//Queue up all of the file worker threads.
while (source.hasNext())
filesToWork.add(new Worker(file));
//Store the potential results of each worker thread.
int n = filesToWork.size();
ArrayList<Future<IOException>> futures =
new ArrayList<Future<IOException>>(n);
//Prepare to return an arbitrary worker's exception.
IOException exception = null;
try {
//Add all workers to the ECS and Future collection.
for (Callable<IOException> callable : filesToWork)
for (int i = 0; i < n; i++) {
try {
//Get each result as it's available, sometimes blocking.
IOException e = ecs.take().get();
//Stop if an exception is returned.
if (e != null) {
exception = e;
//Also catch our own exceptions.
} catch (InterruptedException e) {
exception = new IOException(e);
} catch (ExecutionException e) {
exception = new IOException(e);
} finally {
//Stop any pending tasks if we broke early.
for (Future<IOException> f : futures)
//And kill all of the threads.
//If anything went wrong, it was preserved. Throw it now.
if (exception != null)
throw exception;
//Does work, and returns (not throws) an IOException object on error.
private class Worker implements Callable<IOException> {
private final File file;
public Worker(File file) { this.file = file; }
public IOException call() {
try {
//Do work
} catch (IOException e) {
return e;
return null;