1

在我的 java web 应用程序中是一个基于文件的集成。他们过去常常在我们的生产服务器 opt/app/proceed/ 文件夹中发送一堆 xml 文件(例如:10000)。但是根据当前配置,我们的应用程序能够在顺序处理中处理 200 个文件。因此,延迟处理文件。我正在尝试以并行方式增加文件处理的数量。请找到代码块供您参考。

public class FileEx {

   public static void main(String[] args) throws IOException {
       String fileDir = "C:\\Users\\inputfiles"; //contains more than 10000 files
       new FileEx().traverseFilesFromDir(new File(fileDir));
   }

   public void traverseFilesFromDir(File dir) throws IOException {
       List<File> files = new ArrayList<File>();
       if (dir == null || !dir.isDirectory()) {
           throw new IllegalArgumentException("Not a valid directory (value: " + dir + ").");
       }
       File[] acknFiles = dir.listFiles();
       int fileCount = (acknFiles == null ? 0 : acknFiles.length);

       System.out.println("fileCount:::::::::" + fileCount);

       Arrays.sort(acknFiles, new Comparator<File>() {
           public int compare(File f1, File f2) {
               return Long.valueOf(f1.lastModified()).compareTo(f2.lastModified());
           }
       });

       **int maxNoFiles = acknFiles.length <= 500 ? acknFiles.length : 500;**
       System.out.println(acknFiles.length + " Ackn found and starting to process oldest " + maxNoFiles + " files.");

       for (int i = 0; i < maxNoFiles; i++) {
           files.add(acknFiles[i]);
       }

       int fileCount1 = (files == null ? 0 : files.size());

       if (fileCount1 > 0) {
           for (int i = 0; i < fileCount1; i++) {

               boolean success = true;// processFile(files.get(i));
               if (success) {
                   System.out.println("File Successfully processed.");
               }
           }
       }
   }
}

如何着手改变文件处理方式。等待需要的支持/指导。

4

2 回答 2

0

您可以通过 执行此操作Files.walk(),它返回一个Stream<Path>对象。如果您想并行处理此流,您可以尝试使用parallel()collect(Collectors.toList()).parallelStream()。因为Files.walk()懒惰地评估,parallel()单独可能无法有效利用所有可用的内核。

在此之上,您可以根据需要应用排序过滤。您的处理步骤可以通过forEachOrdered()流末尾的 a 来实现。

这是一个例子:

Files.walk(Paths.get("/path/to/root/directory")) // create a stream of paths
    .collect(Collectors.toList()) // collect paths into list to better parallize
    .parallelStream() // process this stream in multiple threads
    .filter(Files::isRegularFile) // filter out any non-files (such as directories)
    .map(Path::toFile) // convert Path to File object
    .sorted((a, b) -> Long.compare(a.lastModified(), b.lastModified())) // sort files date
    .limit(500) // limit processing to 500 files (optional)
    .forEachOrdered(f -> {
        // do processing here
        System.out.println(f);
    });
于 2021-02-04T10:56:26.653 回答
-1

// java 8.1 及更高版本 可以使用的并行流 // 这里 paralell() 默认使用 ForkJoinPool.commonPool() 执行

Files.lines(Paths.get( files.get(i))
  .parallel()
  .map(Your_FileBean::new) 
  .forEach(/*process Your_FileBean*/);
于 2021-02-04T10:51:13.543 回答