
我熟悉 Python 中的 multiprocessing 包,它使用起来非常简单,所以理想情况下我正在寻找类似的东西:

public interface FictionalFunctor<T>{
  void handle(T arg);

public class FictionalThreadPool {
  public FictionalThreadPool(int threadCount){
  public <T> FictionalThreadPoolMapResult<T> map(FictionalFunctor<T> functor, List<T> args){
    // Executes the given functor on each and every arg from args in parallel. Returns, when
    // all the parallel branches return.
    // FictionalThreadPoolMapResult allows to abort the whole mapping process, at the least.

dir = getDirectoryToProcess();
pool = new FictionalThreadPool(10);   // 10 threads in the pool
pool.map(new FictionalFunctor<File>(){ 
  public void handle(File file){
    // process the file
}, dir.listFiles());




编辑 1


public void processAllFiles() throws IOException {
  ExecutorService exec = Executors.newFixedThreadPool(6);
  BlockingQueue<Runnable> tasks = new LinkedBlockingQueue<Runnable>(5); // Figured we can keep the contents of 6 files simultaneously.
  exec.submit(new MyCoordinator(exec, tasks));
  for (File file : dir.listFiles(getMyFilter()) {
    try {
      tasks.add(new MyTask(file));
    } catch (IOException exc) {
      System.err.println(String.format("Failed to read %s - %s", file.getName(), exc.getMessage()));

public class MyTask implements Runnable {
  private final byte[] m_buffer;
  private final String m_name;

  public MyTask(File file) throws IOException {
    m_name = file.getName();
    m_buffer = Files.toByteArray(file);

  public void run() {
    // Process the file contents

private class MyCoordinator implements Runnable {
  private final ExecutorService m_exec;
  private final BlockingQueue<Runnable> m_tasks;

  public MyCoordinator(ExecutorService exec, BlockingQueue<Runnable> tasks) {
    m_exec = exec;
    m_tasks = tasks;

  public void run() {
    while (true) {
      Runnable task = m_tasks.remove();


  1. 文件一个接一个地被读取。
  2. 文件内容保存在专用MyTask实例中。
  3. 一个容量为 5 的阻塞队列来保存任务。我依靠服务器一次最多保留 6 个文件的内容的能力——队列中有 5 个,另一个完全初始化的任务等待进入队列。
  4. 一个特殊MyCoordinator的任务从队列中获取文件任务并将它们分派到同一个池中。

好的,所以有一个错误 - 可以创建超过 6 个任务。即使所有池线程都忙,也会提交一些。我打算以后解决它。

问题是它根本不起作用。第MyCoordinator一次删除时线程阻塞 - 这很好。但它永远不会解除阻塞,即使新任务已放入队列中。谁能告诉我我做错了什么?


3 回答 3


比使用 ExecuterService 更简单的解决方案是实现您自己的生产者-消费者方案。有一个创建任务并提交到 LinkedBlockingQueue 或 ArrayBlockingQueue 的线程,并有检查此队列以检索任务并执行它们的工作线程。您可能需要一种特殊的任务名称 ExitTask 来强制工作人员退出。因此,在作业结束时,如果您有 n 个工作人员,您需要将 n 个 ExitTasks 添加到队列中。

于 2012-07-27T18:41:11.960 回答

您正在寻找的线程池是ExecutorService类。您可以使用newFixedThreadPool. 这使您可以轻松地实现生产者-消费者模式,池为您封装了所有队列和工作器功能:

ExecutorService exec = Executors.newFixedThreadPool(10);


class ThreadTask implements Runnable {
    public void run() {
       // task code


exec.submit(new ThreadTask());
// alternatively, using an anonymous type
exec.submit(new Runnable() {
           public void run() {
              // task code


于 2012-07-27T18:17:55.107 回答

Basically, what @Tudor said, use an ExecutorService, but I wanted to expand on his code and I always feel strange editing other people's posts. Here's a sksleton of what you would submit to the ExecutorService:

public class MyFileTask implements Runnable {
   final File fileToProcess;

   public MyFileTask(File file) {
      fileToProcess = file;

   public void run() {
      // your code goes here, e.g.
      // if you prefer, implement Callable instead

See also my blog post here for some more details if you get stuck

Since processing Files often leads to IOExceptions, I'd prefer a Callable (which can throw a checked Exception) to a Runnable, but YMMV.

于 2012-07-27T19:14:00.153 回答