2

我的任务是传输大文件,大小约为 5 GB 的文件,因此是读写操作。

我写了一个单线程版本,一次读取 5k,然后立即将 5K 写入另一个位置,单线程版本工作正常,我使用 412 MB 的 zip 文件夹进行测试,大约需要 5 秒。

我的目标是实际编写一个多线程版本,想到的自然设计模式是生产者(读者)消费者(作者)模式。

下面首先是我基于多线程版本的单线程版本:


   import java.net.*;
import java.io.*;
import java.nio.*;
import java.util.Arrays;

public class ReadWrite {

    URI readFrom = null;
    URI writeTo = null;
    //streams
    FileInputStream fis = null;
    FileOutputStream fos = null;
    // good buffer size in Java is generally between 2k to 8k
    byte[] byteBuffer = new byte[5 * 1024];
    //just for testing 
    private int readSoFar = 0;
    //const
    ReadWrite(URI readFrom, URI writeTo) {
        this.readFrom = readFrom;
        this.writeTo = writeTo;

    }

    public URI getReadFrom() {
        return readFrom;
    }

    public void setReadFrom(URI readFrom) {
        this.readFrom = readFrom;
    }

    public URI getWriteTo() {
        return writeTo;
    }

    public void setWriteTo(URI writeTo) {
        this.writeTo = writeTo;
    }

    public void process() throws FileNotFoundException {

        // by chunks therefore buffer
        File fileToRead = new File(readFrom);
        File fileToWrite = new File(writeTo);
        try {
            // if read & write destinations exist
            if (fileToRead.exists()) {
                fis = new FileInputStream(fileToRead);
                // instantiate OutputStream

                fos = new FileOutputStream(fileToWrite);
                // read a chunk, then write , update read position, until there
                // is no more to read
                try {
                    int writeCounter = 0;
                    // read
                    while ((fis.read(byteBuffer, 0, byteBuffer.length)) != -1) {

                        try {
                            //just for testing & seeing output/progress
                            readSoFar= readSoFar + byteBuffer.length;
                            System.out.println("readSoFar:" + readSoFar);
                            // write
                            fos.write(byteBuffer);
                            // clear previous data
                            Arrays.fill(byteBuffer, (byte) 0);
                            System.out.println("writeCounter: " + writeCounter);
                            writeCounter++;
                        } catch (IOException exc) {
                            exc.printStackTrace();

                        }
                    }
                } catch (IOException exc) {
                    exc.printStackTrace();
                }

            } else {
                throw new FileNotFoundException();
            }
        } finally {
            if (fis != null) {
                try {
                    fis.close();
                } catch (IOException e) {
                    // TODO Auto-generated catch block
                    e.printStackTrace();
                }
            }
            if (fos != null) {
                try {
                    fos.flush();
                    fos.close();

                } catch (IOException e) {
                    // TODO Auto-generated catch block
                    e.printStackTrace();
                }
            }
        }
    }// end class ReadWrite

}

主类 FileCopy(单线程):

public class FileCopy {

    public static void main(String[] args) {

     try {
        try {
            //wls1033_dev.zip
            new ReadWrite( new URI("file:/C:/Users/anaim/Pictures/wls1033_dev.zip"),new URI("file:/C:/Users/anaim/Pictures/result/wls1033_dev.zip")).process();
        } catch (URISyntaxException e) {
            // TODO Auto-generated catch block
            e.printStackTrace();
        }
    } catch (FileNotFoundException e) {
        // TODO Auto-generated catch block
        e.printStackTrace();
    }

    }

}//end main class

输出应如下所示:

. . . readSoFar:423198720 writeCounter:82655 readSoFar:423203840 writeCounter:82656 readSoFar:423208960 writeCounter:82657

很明显,目标文件不应该被修改并且与原始文件相同。


下面是多线程版本,它基于单线程代码,除了线程逻辑和锁通过同步,等待,通知实现。多线程实际上正在工作,没有死锁或活力问题,但是读/写操作不会终止。

问题似乎出在变量“ readState”及其 getReadState()方法上,当在主类中调用时,readState == -1 似乎无法正常运行,而不是读/写应该终止。

多线程读写(ReadProducerWriteConsumer 类)提供了一个 readProcess 和 writeProcess,它们根据“布尔空”变量/标志的值等待同步部分

import java.net.*;
import java.io.*;
import java.nio.*;
import java.util.Arrays;

public class ReadProducerWriteConsumer {

    URI readFrom = null;
    URI writeTo = null;
    //
    FileInputStream fis = null;
    FileOutputStream fos = null;
    // good buffer size in Java is generally between 2k to 8k
    byte[] byteBuffer = new byte[1024];
    //
    File fileToRead = null;
    File fileToWrite = null;
    //
    private int readSoFar = 0;
    int writeCounter = 0;
    //
    volatile private int readState = 0;
    // Consumer & Producer state , hence has anything been read in order to be
    // written
    boolean empty = true;

    ReadProducerWriteConsumer(URI readFrom, URI writeTo) {
        this.readFrom = readFrom;
        this.writeTo = writeTo;
        //
        fileToRead = new File(readFrom);
        fileToWrite = new File(writeTo);
    }

    public long getReadState() {
        return this.readState;
    }

    public void setReadState(int readState) {
        this.readState = readState;
    }

    public URI getReadFrom() {
        return readFrom;
    }

    public void setReadFrom(URI readFrom) {
        this.readFrom = readFrom;
    }

    public URI getWriteTo() {
        return writeTo;
    }

    public void setWriteTo(URI writeTo) {
        this.writeTo = writeTo;
    }

    public synchronized void readProcess() throws FileNotFoundException {

        // while false, while data is being written , wait
        while (empty == false) {
            try {
                wait();
            } catch (InterruptedException e) {
                // TODO Auto-generated catch block
                e.printStackTrace();
            }
        }

        // by chunks therefore buffer
        File fileToRead = new File(readFrom);

        try {
            // if read & write destinations exist
            if (fileToRead.exists()) {
                fis = new FileInputStream(fileToRead);
                // read a chunk
                try {
                    // while readSoFar!=-1
                    while (((this.readState = fis.read(byteBuffer, 0,
                            byteBuffer.length)) != -1) && empty != false) {

                        // just for testing & seeing output/progress
                        readSoFar = readSoFar + byteBuffer.length;
                        System.out.println("readSoFar:" + readSoFar);

                        // read a chunck , now that buffer is full set emoty to
                        // false
                        empty = false;

                    }
                } catch (IOException exc) {
                    exc.printStackTrace();
                }

            } else {
                throw new FileNotFoundException();
            }
        } finally {
            if (fis != null) {
                try {
                    fis.close();
                } catch (IOException e) {
                    // TODO Auto-generated catch block
                    e.printStackTrace();
                }
            }
            // new data has been read , notify all threads waiting to consume
            // data
            notifyAll();
        }
    }

    public synchronized void writeProcess() throws FileNotFoundException {
        // while true, therefore there is nothing to write, wait
        while (empty == true) {
            try {
                wait();
            } catch (InterruptedException e) {
                // TODO Auto-generated catch block
                e.printStackTrace();
            }
        }
        // by chunks therefore buffer
        File fileToWrite = new File(writeTo);

        try {
            // instantiate OutputStream
            fos = new FileOutputStream(fileToWrite);
            // then write , update read position

            // write
            try {
                fos.write(byteBuffer);
                // clear previous data
                Arrays.fill(byteBuffer, (byte) 0);
                System.out.println("writeCounter: " + writeCounter);
                writeCounter++;
            } catch (IOException exc) {
                exc.printStackTrace();

            }

        } finally {

            if (fos != null) {
                try {
                    fos.flush();
                    fos.close();

                } catch (IOException e) {
                    // TODO Auto-generated catch block
                    e.printStackTrace();
                }
            }
            // new data has been written , notify all threads waiting to
            // read/produce more data
            empty = true;
            notifyAll();
        }
    }

}//end class ReadProducerWriteConsumer

readRunnable 类实现一个 runnable 并调用 ReadProducerWriteConsumer.readProcess()

import java.io.FileNotFoundException;

public class readRunnable implements Runnable {
    ReadProducerWriteConsumer ReaderProducerWriterConsumer = null;

    public readRunnable(ReadProducerWriteConsumer readerProducerWriterConsumer) {
        super();
        ReaderProducerWriterConsumer = readerProducerWriterConsumer;
    }

    @Override
    public void run() {
        // TODO Auto-generated method stub
        try {
            ReaderProducerWriterConsumer.readProcess();
        } catch (FileNotFoundException e) {
            // TODO Auto-generated catch block
            e.printStackTrace();
        }
    }
    // call ReaderProducerWriterConsumer read method
}//end readRunnable class

writeRunnable 类实现了一个 runnable 并调用 ReadProducerWriteConsumer.writeProcess()

import java.io.FileNotFoundException;


public class writeRunnable implements Runnable
    {
        ReadProducerWriteConsumer ReaderProducerWriterConsumer = null; 

        public writeRunnable (ReadProducerWriteConsumer readerProducerWriterConsumer) {
            super();
            ReaderProducerWriterConsumer = readerProducerWriterConsumer;
        }

        @Override
        public void run() {
            // TODO Auto-generated method stub
            try {
                ReaderProducerWriterConsumer.writeProcess();
            } catch (FileNotFoundException e) {
                // TODO Auto-generated catch block
                e.printStackTrace();
            }
        }
        //ReaderProducerWriterConsumer write method
    }//end writeRunnable

实例化线程并创建读/写线程直到没有更多要读取的主类,使用 ReadProducerWriteConsumer.getReadState() 检查此条件

import java.io.File;
import java.io.FileInputStream;
import java.io.FileNotFoundException;
import java.io.FileOutputStream;
import java.io.IOException;
import java.net.MalformedURLException;
import java.net.URI;
import java.net.URISyntaxException;
import java.net.URL;
import java.nio.ByteBuffer;
import java.nio.channels.FileChannel;
import java.nio.file.Paths;
import java.util.Arrays;

public class FileCopy {

    public static void main(String[] args) {

        ReadProducerWriteConsumer ReaderProducerWriterConsumer = null;
        try {
            ReaderProducerWriterConsumer = new ReadProducerWriteConsumer(
                    new URI("file:/C:/Users/anaim/Pictures/pic.png"), new URI(
                            "file:/C:/Users/anaim/Pictures/result/pic.png"));
        } catch (URISyntaxException e) {
            // TODO Auto-generated catch block
            e.printStackTrace();
        }
        Thread readThread = null;
        Thread writeThread = null;

        while (ReaderProducerWriterConsumer.getReadState() != -1) {

            readThread = new Thread(new readRunnable(
                    ReaderProducerWriterConsumer));

            writeThread = new Thread(new writeRunnable(
                    ReaderProducerWriterConsumer));

            readThread.start();
            writeThread.start();

            try {
                readThread.join();
            } catch (InterruptedException e) {
                // TODO Auto-generated catch block
                e.printStackTrace();
            }
            try {
                writeThread.join();
            } catch (InterruptedException e) {
                // TODO Auto-generated catch block
                e.printStackTrace();
            }
        }



    }

}// end main class

因此,多线程版本的问题是读/写操作不会停止,因此写操作会产生损坏的文件。

请提供智能解决方案。谢谢。

4

1 回答 1

0

错误在您的写入过程中,它不会循环:它只会写入一次然后退出,然后读取过程将重新填充缓冲区,并永远等待它再次变为空。

除此之外,你的设计很有趣,但有点不寻常。

它有几个问题:

  • FileInputStream.read 函数不一定返回您请求的字节数,返回值告诉您读取了多少字节。您应该将该值用于您的写入过程。

  • 您正在使用循环来检查empty布尔值作为锁。使用信号量会更容易(参见java.util.concurrent),

  • 或者,您可以使用 BlockingQueue 来同步您的读取和写入。

  • 还有一种称为循环缓冲区的数据结构,它可以让您在缓冲区中以“连续方式”存储读取的数据,但我不知道它是否存在于 Java 中。基本上,您在缓冲区中保留 2 个指针,一个指向缓冲区中可用剩余空间的开头,一个指向已用空间的开头。当您写入缓冲区时,您开始在可用空间的位置存储数据。如果到达缓冲区的末尾,则返回开始(因此是“循环”的想法)。您必须注意不要传递已用空间指针,但如果您的读取操作正确(也就是说,您要求的不超过剩余的可用空间),您应该能够避免这个问题。当你从缓冲区读取时,你从使用的空间指针开始,并读取缓冲区中可用的尽可能多的字节。也许您应该在稍后阶段尝试这种方法。

于 2012-10-28T20:41:40.550 回答