2

我正在尝试读取文件夹中的多个文件并使用线程同时处理它们。

程序的结构是这样的:

// Assuming there are 5 files in the directory

// creating the threads
ExecutorService pool = Executors.newFixedThreadPool(5)
ExecutorCompletionService service = new ExecutorCompletionService(pool)

directory.listFiles().each { eachFile ->
   service.submit(new FileReader(eachFile, param2))
}


// the FileReader class
class FileReader implements Callable {
    File file
    String param
    FileReader(File file, String param){
        this.file = file
        this.param = param
    }

   Object call(){
      LOG.info("Processing file" + filePath)
      ConfigInfo configInfo = new ConfigInfo()
  configInfo.setFilePath(filePath);
  configInfo.setReaderUid(readerUid);
  configInfo.setPatternsMap(patternsMap);
  new LogfileDataProcessor(configObject, param).processFileContent()
   }

}

这里的 call 方法创建另一个对象并在其上调用一个方法。

但奇怪的是,程序在调用方法中执行了一些行后终止(它没有到达其中的最终语句)。我在这里很困惑。有人可以对正在发生的事情有所了解。请帮我

4

2 回答 2

1

您需要您的程序等待线程完成。例如,您可以为此使用 CountDounLatch:

CountDownLatch latch = new CountDownLatch(numberOfFilesInDirectory);

directory.listFiles().each { eachFile ->
   service.submit(new FileReader(eachFile, param2))
}

latch.await();


// And in your Callable:
class FileReader implements Callable {
File file
String param
FileReader(File file, String param){
    this.file = file
    this.param = param
}

public Object call() {
    try {
    LOG.info("Processing file" + filePath)
    ConfigInfo configInfo = new ConfigInfo()
    configInfo.setFilePath(filePath);
    configInfo.setReaderUid(readerUid);
    configInfo.setPatternsMap(patternsMap);
    new LogfileDataProcessor(configObject, param).processFileContent();
    } finally {
        latch .countDown();
    }
}

您可以将闩锁作为构造函数参数传递给线程。

于 2012-05-16T05:49:28.680 回答
0

首先,你怎么知道你的call方法中只执行了几行而不是全部?我看到您发布的代码发生这种情况的唯一方法是创建该ConfigInfo对象会引发异常。

Callable我尝试重现您所描述的场景,但只有在我将工作卸载到新的守护进程时,我才设法让我的程序退出Thread。有关我的 SSCCE,请参见下文。根据您是否将thread.setDaemon( true );调用留在其中,程序会在所有Threads 执行后很好地完成,或者在任何输出写入控制台之前完成。

import java.util.concurrent.Callable;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

public class ExecutorServiceDemo {
  private static ExecutorService pool = Executors.newFixedThreadPool( 5 );

  private static void createAndExecuteThreads(){
    for( int i =0; i< 5; i++ ){
      pool.submit( new DummyCallable( "Callable " + i ) );
    }
  }

  public static void main( String[] args ) {
    createAndExecuteThreads();
    pool.shutdown();
  }
  private static class DummyCallable implements Callable<Object>{
    private final String message;

    private DummyCallable( String amessage ) {
      message = amessage;
    }

    @Override
    public Object call() throws Exception {
      Runnable runnable = new Runnable() {
        @Override
        public void run() {
          try {
            Thread.sleep( 5000 );
            System.out.println( "message = " + message );
          } catch ( InterruptedException e ) {
            e.printStackTrace();
          }
        }
      };
      Thread thread = new Thread( runnable );
      thread.setDaemon( true );
      thread.start();
      return null;
    }
  }

}

你有可能在你的new LogfileDataProcessor(configObject, param).processFileContent()方法中做类似的事情吗?

于 2012-05-16T05:48:21.907 回答