最近我正在使用 Java 7 FORK/JOIN 框架和 FileChannel 来复制文件。这是我的代码(Test.java):
import java.io.File;
import java.io.FileInputStream;
import java.io.FileNotFoundException;
import java.io.FileOutputStream;
import java.io.IOException;
import java.nio.channels.FileChannel;
import java.nio.file.Files;
import java.nio.file.Paths;
import java.util.ArrayList;
import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.RecursiveTask;
import java.util.concurrent.TimeUnit;
import java.util.logging.Level;
import java.util.logging.Logger;
public class Test {
private ArrayList<FileProcessor> processors = new ArrayList<FileProcessor>();
public Test(){
String outputDir = "C:\\temp";
if (!Files.isDirectory(Paths.get(outputDir))) {
System.out.println("this is not a path");
} else {
try {
//start copying file
ForkJoinPool pool = new ForkJoinPool();
int numberOfThread = 2;
File file = new File("C:\\abc.cdm");
long length = file.length();
long lengthPerCopy = (long)(length/numberOfThread);
long position = 0L;
for (int i = 0; i < numberOfThread; i++) {
FileProcessor processor = null;
if (i == numberOfThread - 1) {
//the last thread
processor = new FileProcessor("abc.cdm", "C:\\abc.cdm", "C:\\temp", position, length - position);
} else {
processor = new FileProcessor("abc.cdm", "C:\\abc.cdm", "C:\\temp", position, lengthPerCopy);
position = position + lengthPerCopy + 1;
}
processors.add(processor);
pool.execute(processor);
}
do {
System.out.printf("******************************************\n");
System.out.printf("Main: Parallelism: %d\n", pool.getParallelism());
System.out.printf("Main: Active Threads: %d\n", pool.getActiveThreadCount());
System.out.printf("Main: Task Count: %d\n", pool.getQueuedTaskCount());
System.out.printf("Main: Steal Count: %d\n", pool.getStealCount());
System.out.printf("******************************************\n");
try
{
TimeUnit.SECONDS.sleep(1);
} catch (InterruptedException e)
{
e.printStackTrace();
}
} while (!isDone()); //when all the thread not been done
pool.shutdown();
System.out.println("copy done");
} catch (Exception ex) {
//out an error here...
}
}
}
private boolean isDone(){
boolean res = false;
for (int i = 0; i < processors.size(); i++) {
res = res || processors.get(i).isDone();
}
return res;
}
public static void main(String args[]) {
Test test = new Test();
}
class FileProcessor extends RecursiveTask<Integer>
{
private static final long serialVersionUID = 1L;
private long copyPosition;
private long copyCount;
FileChannel source = null;
FileChannel destination = null;
//Implement the constructor of the class to initialize its attributes
public FileProcessor(String fileName, String filePath, String outputPath, long position, long count) throws FileNotFoundException, IOException{
this.copyPosition = position;
this.copyCount = count;
this.source = new FileInputStream(new File(filePath)).getChannel().position(copyPosition);
this.destination = new FileOutputStream(new File(outputPath + "/" + fileName), true).getChannel().position(copyPosition);
}
@Override
protected Integer compute()
{
try {
this.copyFile();
} catch (IOException ex) {
Logger.getLogger(FileProcessor.class.getName()).log(Level.SEVERE, null, ex);
}
return new Integer(0);
}
private void copyFile() throws IOException {
try {
destination.transferFrom(source, copyPosition, copyCount);
}
finally {
if (source != null) {
source.close();
}
if (destination != null) {
destination.close();
}
}
}
}
}
我运行我的代码,如果线程数为 1,则文件被精确复制,但当线程数为 2 时,文件“C:\abc.cdm”为 77KB(78335),但复制后,文件“C:\ temp\abc.cdm" 只是 (39KB)。
我哪里错了,请告诉我??
更新:我的问题已经解决 问题出在 isDone 方法中,一定是:
boolean res = true;
for (int i = 0; i < processors.size(); i++) {
res = res && processors.get(i).isDone();
}
return res;
还要编辑以下代码行:
File file = new File(selectedFile[i].getPath());
long length = file.length();
new RandomAccessFile("C:\\temp\abc.cdm", "rw").setLength(length);
这只是 FORK/JOIN 用法的练习!