1

我有一个目录,其中包含我需要解析的 1000 个 csv 文件。我已经实现了 Java 的 ExecutorService 类来完成这项工作,其中我为每个线程分配了一个 csv 文件来解析。我的机器上有 4 个内核。与单线程应用程序相比,我确实获得了一些效率。但是,当我看到 CPU 利用率(使用任务管理器)时,它似乎并没有利用所有的 CPU 能力,使用的 CPU 百分比仅为 30%-40% 左右。我只是想知道我的方法是否正确。

File dir = new File(file);
if(dir.isDirectory()){
    File[] files = dir.listFiles();

for(File f : files){
    String file_abs_path = f.getAbsolutePath();
    int index = file_abs_path.lastIndexOf("/") + 1;
    file_name = file_abs_path.substring(index);
    futuresList.add(eservice.submit(new MyParser(file_abs_path)));
}

Object gpDocs;
for(Future<List<MyObj>> future:futuresList) {
try {
    docs = future.get();
    arrayList = (List<MyObj>)docs;
    Iterator<MyObj> it = arrayList.iterator();
    while(it.hasNext()){
    doc = createDocument(file_name,it.next());
    try{
        //somefunction(doc);
        }catch(Exception e){}
}}catch (InterruptedException e) {}
catch (ExecutionException e) {}
}} 

我只是想知道我的方法是否正确?任何帮助,将不胜感激。

谢谢

解析器的代码是:

public List<MyObj> call(){
    ColumnPositionMappingStrategy<MyObj> strat = 
new ColumnPositionMappingStrategy<MyObj>();
strat.setType(MyObj.class);
String[] columns = new String[] {//list of columns in the csv file}; 

strat.setColumnMapping(columns);
CsvToBean<MyObj> csv = new CsvToBean<MyObj>();
BufferedReader reader = null;
String doc_line = "";
String[] docs;
String doc = "";
File dir = new File(file_path);
try{
    int comma_count = 0;
    reader = new BufferedReader(new FileReader(dir));
    while((doc_line = reader.readLine()) != null){
        docs = doc_line.split(",");
    doc += docs[i] + " ";
    }
    reader.close();
    }catch (IOException e) {/*e.printStackTrace();*/}
    return(csv.parse(strat,new StringReader(doc)));
}
4

3 回答 3

1

正如所评论的,您的任务很可能是 IO 绑定的,因为大多数涉及 IO 到硬盘驱动器的任务通常都是如此。

您可以期望的最佳性能最有可能将读取线程与处理分离。可能,单个读取线程,读取尽可能大的数据块并将其送入队列进行处理将产生最佳的整体吞吐量。处理线程的数量只是跟上读数所必需的。

于 2013-07-03T19:07:15.103 回答
0

但是,当我看到 CPU 利用率(使用任务管理器)时,它似乎并没有利用所有的 CPU 能力,使用的 CPU 百分比仅为 30%-40%

这是因为,您正在处理files顺序而不是并行。请参阅以下代码块:

for(Future<List<MyObj>> future:futuresList) {
try {
    docs = future.get(); //(1)
    arrayList = (List<MyObj>)docs;//(2)

第 (1) 行是您实际执行线程的行。但它在做什么?
正如您在评论中所说,
这只是一个使用 opencsv 的简单 CSV 文件解析器,它实现了 Callable 接口,并在它覆盖的 call 方法中具有解析逻辑。
而且您正在使线程按顺序执行而不是并行执行。此外,真正的 IO 在以下代码中完成: Iterator it = arrayList.iterator();

while(it.hasNext()){
doc = createDocument(file_name,it.next());
try{
    //somefunction(doc);
    }catch(Exception e){}

我认为createDocument是一个详尽的 IO 绑定操作。并且此方法是按顺序执行的,而不是由每个线程并行执行。

于 2013-07-03T19:21:50.047 回答
0

正如已经发布的那样,请确保处理发生在您的线程中,而不是在单个调度线程中。我会尝试这个相对简单的解决方案:

  • 给每个线程一个File对象来处理。这确保了实际工作在每个线程中完成,并且您打开的文件数量有限。(例如,如果您将 an 传递InputStream给线程,那么您将立即打开所有文件。通过传递Files,您将只有与最大并发线程数一样多的打开文件。)
  • 让它使用带有大缓冲区的 aBufferedReader或 a BufferedInputStream,例如 1-4MB。这使您的应用程序一次读取大块,这比多个线程读取小块和硬盘磁头不断寻找要高效得多。希望您的操作系统会安排此类读取,以便它们不会同时发生。
  • 使用 调度它们ExecutorService,可能使用的数字略大于处理器的数量,这样如果某个线程被 IO 阻塞,则有足够多的其他线程可以工作(当然,如果有适合他们的工作)。

所以结果可能看起来像这样:

File[] files = dir.listFiles();
final int bufSize = 1024*1024;

// prepare tasks
List<Callable<List<MyObj>>> tasks
    = new ArrayList<Callable<List<MyObj>>>();
for(final File file : files)
    tasks.add(new Callable<List<MyObj>>() {
        public List<MyObj> call() throws Exception {
            Reader r = new InputStreamReader(
                    new BufferedInputStream(
                        new FileInputStream(file), bufSize)
                );
            try {
                // do processing
            } finally {
                r.close();
            }
        }
    });

// run them
int threadCount = Runtime.getRuntime().availableProcessors() + 2;
List<Future<List<MyObj>>> results
    = Executors.newFixedThreadPool(threadCount).invokeAll(tasks);

如果事实证明处理是瓶颈,而不是磁盘 IO,您可以通过使用java.nio.

(注:我只是草绘了代码,我没有尝试编译它。)

于 2013-07-03T20:01:13.967 回答