3

我的任务是处理文件目录,如果出现任何问题,需要抛出 IOException。我还需要它跑得更快,所以我将完成的工作分成多个线程并等待它们的终止。它看起来像这样:

//Needs to throw IOException so the rest of the framework handles it properly.
public void process(File directory) throws IOException {
    ExecutorService executorService =
        new ThreadPoolExecutor(16, 16, Long.MAX_VALUE, TimeUnit.NANOSECONDS,
            new LinkedBlockingQueue<Runnable>());

    //Convenience class to walk over relevant file types.
    Source source = new SourceImpl(directory);
    while (source.hasNext()) {
        File file = source.next();
        executorService.execute(new Worker(file));
    }

    try {
        executorService.shutdown();
        executorService.awaitTermination(Long.MAX_VALUE, TimeUnit.NANOSECONDS);
    } catch (InterruptedException e) {
        executorService.shutdownNow();
        throw new IOException("Worker thread had a problem!");
    }
}

而Worker线程基本上是:

private class Worker implements Runnable {
    private final File file;
    public Worker(File file) { this.file = file; }

    @Override
    public void run() {
        try {
            //Do work
        } catch (IOException e) {
            Thread.currentThread().interrupt();
        }
    }
}

期望的行为是,如果任何 Worker 有一个 IOException,那么生成线程就会意识到它,并且可以反过来抛出它自己的 IOException。这是我能想到的让 Worker 线程发出错误信号的最好方法,但我仍然不确定我是否设置正确。

那么,首先,这会达到我的预期吗?如果 Worker 线程在 run() 中出现错误,调用是否Thread.currentThread().interrupt();会引发 InterruptedException 以使其被阻塞捕获executorService.awaitTermination(Long.MAX_VALUE, TimeUnit.NANOSECONDS);

其次,如果一个正在运行的 Worker 在所有线程都排队之前调用它的中断会发生什么?在阻塞 try/catch 块之前?

最后(也是最重要的),有没有更优雅的方式来实现我的目标?我想让所有无数子线程执行直到完成或直到其中任何一个出现错误,此时我想在生成线程中处理它(通过有效地使整个目录失败)。


解决方案

根据答案,这是我最终使用的实现。它很好地处理了我的异步需求,并在 IOExceptions 上干净且相对快速地失败。

public void process(File directory) throws IOException {
    //Set up a thread pool of 16 to do work.
    ExecutorService executorService = Executors.newFixedThreadPool(16);
    //Arbitrary file source.
    Source source = new SourceImpl(directory);
    //List to hold references to all worker threads.
    ArrayList<Callable<IOException>> filesToWork =
        new ArrayList<Callable<IOException>>();
    //Service to manage the running of the threads.
    ExecutorCompletionService<IOException> ecs =
        new ExecutorCompletionService<IOException>(executorService);

    //Queue up all of the file worker threads.
    while (source.hasNext())
        filesToWork.add(new Worker(file));

    //Store the potential results of each worker thread.
    int n = filesToWork.size();
    ArrayList<Future<IOException>> futures =
        new ArrayList<Future<IOException>>(n);

    //Prepare to return an arbitrary worker's exception.
    IOException exception = null;
    try {
        //Add all workers to the ECS and Future collection.
        for (Callable<IOException> callable : filesToWork)
            futures.add(ecs.submit(callable));
        for (int i = 0; i < n; i++) {
            try {
                //Get each result as it's available, sometimes blocking.
                IOException e = ecs.take().get();
                //Stop if an exception is returned.
                if (e != null) {
                    exception = e;
                    break;
                }
            //Also catch our own exceptions.
            } catch (InterruptedException e) {
                exception = new IOException(e);
                break;
            } catch (ExecutionException e) {
                exception = new IOException(e);
                break;
            }
        }
    } finally {
        //Stop any pending tasks if we broke early.
        for (Future<IOException> f : futures)
            f.cancel(true);
        //And kill all of the threads.
        executorService.shutdownNow();
    }

    //If anything went wrong, it was preserved. Throw it now.
    if (exception != null)
        throw exception;
}

//Does work, and returns (not throws) an IOException object on error.
private class Worker implements Callable<IOException> {
    private final File file;
    public Worker(File file) { this.file = file; }

    @Override
    public IOException call() {
        try {
            //Do work
        } catch (IOException e) {
            return e;
        }
        return null;
    }
}
4

3 回答 3

2

interrupt()这样调用不会影响主线程。

你应该做的是让你的工人 aCallable而不是 aRunnable并允许失败异常离开call()方法。然后,使用ExecutorCompletionService执行所有 Worker 。这将允许您确定每个任务的状态,并在其中一个任务失败时在主线程中采取行动。

于 2012-07-09T19:47:14.820 回答
1

与往常一样,线程之间最好的通信是队列。让每个工作人员发送一条消息,描述其执行是如何完成的,并让生成线程从队列中读取。此外,由于生成线程知道它生成了多少个工人,它可以只计算消息以了解所有工人何时完成,而不依赖于池关闭。

于 2012-07-09T19:32:35.077 回答
1

让 Worker 实现Callable<Void>,你可以这样做:

public void process(File directory) throws IOException {
    ExecutorService executorService = new ThreadPoolExecutor(16, 16,
            Long.MAX_VALUE, TimeUnit.NANOSECONDS,
            new LinkedBlockingQueue<Runnable>());

    // Convenience class to walk over relevant file types.
    List<Future<Void>> futures = new ArrayList<Future<Void>>();
    Source source = new SourceImpl(directory);
    while (source.hasNext()) {
        File file = source.next();
        futures.add(executorService.submit(new Worker(file)));
    }

    try {
        for (Future<Void> future : futures) {
            future.get();
        }
    } catch (ExecutionException e) {
        throw new IOException("Worker thread had a problem!", e.getCause());
    } catch (InterruptedException e) {
        throw new IOException("Worker thread had a problem!", e);
    } finally {
        executorService.shutdown();
    }
}
于 2012-07-09T20:16:19.103 回答