0

最近我正在使用 Java 7 FORK/JOIN 框架和 FileChannel 来复制文件。这是我的代码(Test.java):

import java.io.File;
import java.io.FileInputStream;
import java.io.FileNotFoundException;
import java.io.FileOutputStream;
import java.io.IOException;
import java.nio.channels.FileChannel;
import java.nio.file.Files;
import java.nio.file.Paths;
import java.util.ArrayList;
import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.RecursiveTask;
import java.util.concurrent.TimeUnit;
import java.util.logging.Level;
import java.util.logging.Logger;

public class Test {
    private ArrayList<FileProcessor> processors = new ArrayList<FileProcessor>();

    public Test(){
        String outputDir = "C:\\temp";
        if (!Files.isDirectory(Paths.get(outputDir))) {
           System.out.println("this is not a path");
        } else {
            try {
                //start copying file
                ForkJoinPool pool = new ForkJoinPool();
                int numberOfThread = 2;
                File file = new File("C:\\abc.cdm"); 

                long length = file.length();
                long lengthPerCopy = (long)(length/numberOfThread);
                long position = 0L;

                for (int i = 0; i < numberOfThread; i++) {
                    FileProcessor processor = null;
                    if (i == numberOfThread - 1) {
                        //the last thread
                        processor = new FileProcessor("abc.cdm", "C:\\abc.cdm", "C:\\temp", position, length - position);
                    } else {
                        processor = new FileProcessor("abc.cdm", "C:\\abc.cdm", "C:\\temp", position, lengthPerCopy);
                        position = position + lengthPerCopy + 1;
                    }
                    processors.add(processor);
                    pool.execute(processor);
                }

                do {
                   System.out.printf("******************************************\n");
                   System.out.printf("Main: Parallelism: %d\n", pool.getParallelism());
                   System.out.printf("Main: Active Threads: %d\n", pool.getActiveThreadCount());
                   System.out.printf("Main: Task Count: %d\n", pool.getQueuedTaskCount());
                   System.out.printf("Main: Steal Count: %d\n", pool.getStealCount());
                   System.out.printf("******************************************\n");
                   try
                   {
                      TimeUnit.SECONDS.sleep(1);
                   } catch (InterruptedException e)
                   {
                      e.printStackTrace();
                   }
                } while (!isDone()); //when all the thread not been done

            pool.shutdown();
            System.out.println("copy done");
            } catch (Exception ex) {
                //out an error here...
            }
        }

    }

    private boolean isDone(){
        boolean res = false;
        for (int i = 0; i < processors.size(); i++) {
            res = res || processors.get(i).isDone();
        }
        return res;
    }

    public static void main(String args[]) {
        Test test = new Test();
    }

    class FileProcessor extends RecursiveTask<Integer>
    {
       private static final long serialVersionUID = 1L;
       private long copyPosition;
       private long copyCount;
       FileChannel source = null;
       FileChannel destination = null;
       //Implement the constructor of the class to initialize its attributes
       public FileProcessor(String fileName, String filePath, String outputPath, long position, long count) throws FileNotFoundException, IOException{
           this.copyPosition = position;
           this.copyCount = count;
           this.source = new FileInputStream(new File(filePath)).getChannel().position(copyPosition);
           this.destination = new FileOutputStream(new File(outputPath + "/" + fileName), true).getChannel().position(copyPosition);
       }

       @Override
       protected Integer compute()
       {
           try {
               this.copyFile();
           } catch (IOException ex) {
               Logger.getLogger(FileProcessor.class.getName()).log(Level.SEVERE, null, ex);
           }
           return new Integer(0);
       }

       private void copyFile() throws IOException {

            try {
                destination.transferFrom(source, copyPosition, copyCount);
            } 
            finally {
                if (source != null) {
                    source.close();
                }
                if (destination != null) {
                    destination.close();
                }
            }
        }
    }

}

我运行我的代码,如果线程数为 1,则文件被精确复制,但当线程数为 2 时,文件“C:\abc.cdm”为 77KB(78335),但复制后,文件“C:\ temp\abc.cdm" 只是 (39KB)。

我哪里错了,请告诉我??

更新:我的问题已经解决 问题出在 isDone 方法中,一定是:

boolean res = true;
for (int i = 0; i < processors.size(); i++) {
    res = res && processors.get(i).isDone();
}
return res;

还要编辑以下代码行:

File file = new File(selectedFile[i].getPath());
long length = file.length();
new RandomAccessFile("C:\\temp\abc.cdm", "rw").setLength(length);

这只是 FORK/JOIN 用法的练习!

4

1 回答 1

2

您的 isDone() 方法确实是错误的,您在原始问题中更正了它。但是 FileProcessor 中还有另一个问题。您假设将目标位置设置在文件末尾之后将在您传输到文件时自动增大文件。不是这种情况。

您的第一个段将始终写入,因为写入位置为 0 并且文件的长度不能小于零。那是您看到的 39K,大约是总文件大小的一半。第二段从未被写入。

为了让您的代码运行,您可以在开始时执行以下操作:

 File file = new File("C:\\abc.cdm"); 
 long length = file.length();

 new RandomAccessFile("C:\\temp\\abc.cdm", "rw").setLength(length);`
于 2014-12-25T08:47:56.330 回答