1

我创建了使用 java 实现的普通发布者和订阅者,它按大小读取内容,总大小为 5MB,每 1MB 发布一次给订阅者。数据正在成功发布。现在正面临附加问题内容到现有文件。最后我只能在文件中找到最后 1MB 的数据。所以请让我知道如何解决这个问题?我还附上了发布者和订阅者的源代码。

Publisher:

public class MessageDataPublisher {
    static StringBuffer fileContent;
    static RandomAccessFile randomAccessFile ;

    public static void main(String[] args) throws IOException {
        MessageDataPublisher msgObj=new MessageDataPublisher();

        String fileToWrite="test.txt";
        msgObj.towriteDDS(fileToWrite);
    }


    public void towriteDDS(String fileName) throws IOException{

        DDSEntityManager mgr=new DDSEntityManager();
        String partitionName="PARTICIPANT";



        // create Domain Participant
        mgr.createParticipant(partitionName);

        // create Type
        BinaryFileTypeSupport binary=new BinaryFileTypeSupport();
        mgr.registerType(binary);


        // create Topic
        mgr.createTopic("Serials");

        // create Publisher
        mgr.createPublisher();

        // create DataWriter
        mgr.createWriter();

        // Publish Events

        DataWriter dwriter = mgr.getWriter();
        BinaryFileDataWriter binaryWriter=BinaryFileDataWriterHelper.narrow(dwriter);


        int bufferSize=1024*1024;


        File readfile=new File(fileName);
        FileInputStream is = new FileInputStream(readfile);
        byte[] totalbytes = new byte[is.available()];
        is.read(totalbytes);
        byte[] readbyte = new byte[bufferSize];
        BinaryFile binaryInstance;

        int k=0;
        for(int i=0;i<totalbytes.length;i++){
            readbyte[k]=totalbytes[i];
            k++;
            if(k>(bufferSize-1)){
                binaryInstance=new BinaryFile();
                binaryInstance.name="sendpublisher.txt";
                binaryInstance.contents=readbyte;
                int status = binaryWriter.write(binaryInstance, HANDLE_NIL.value);
                ErrorHandler.checkStatus(status, "MsgDataWriter.write");

                ErrorHandler.checkStatus(status, "MsgDataWriter.write");

                k=0;
                }

        }
        if(k < (bufferSize-1)){
            byte[] remaingbyte = new byte[k];                   
            for(int j=0;j<(k-1);j++){
                remaingbyte[j]=readbyte[j];
            }
            binaryInstance=new BinaryFile();
            binaryInstance.name="sendpublisher.txt";
            binaryInstance.contents=remaingbyte;
            int status = binaryWriter.write(binaryInstance, HANDLE_NIL.value);
            ErrorHandler.checkStatus(status, "MsgDataWriter.write");

        }       
        is.close();


        try {
            Thread.sleep(4000);

        } catch (InterruptedException e) {
            e.printStackTrace();
        }

        // clean up
        mgr.getPublisher().delete_datawriter(binaryWriter);
        mgr.deletePublisher();
        mgr.deleteTopic();
        mgr.deleteParticipant();

    }




}


Subscriber:


public class MessageDataSubscriber {
    static RandomAccessFile randomAccessFile ;
    public static void main(String[] args) throws IOException {
        DDSEntityManager mgr = new DDSEntityManager();
        String partitionName = "PARTICIPANT";

        // create Domain Participant
        mgr.createParticipant(partitionName);

        // create Type
        BinaryFileTypeSupport msgTS = new BinaryFileTypeSupport();
        mgr.registerType(msgTS);

        // create Topic
        mgr.createTopic("Serials");

        // create Subscriber
        mgr.createSubscriber();

        // create DataReader
        mgr.createReader();

        // Read Events
        DataReader dreader = mgr.getReader();
        BinaryFileDataReader binaryReader=BinaryFileDataReaderHelper.narrow(dreader);
        BinaryFileSeqHolder binaryseq=new BinaryFileSeqHolder();
        SampleInfoSeqHolder infoSeq = new SampleInfoSeqHolder();
        boolean terminate = false;
        int count = 0;

        while (!terminate && count < 1500) {
             // To run undefinitely
            binaryReader.take(binaryseq, infoSeq, 10,
                    ANY_SAMPLE_STATE.value, ANY_VIEW_STATE.value,ANY_INSTANCE_STATE.value);
                for (int i = 0; i < binaryseq.value.length; i++) {
                    toWrtieXML(binaryseq.value[i].contents);
                    terminate = true;
            }

            try
            {
                Thread.sleep(200);
            }
            catch(InterruptedException ie)
            {
            }
            ++count;

        }
            binaryReader.return_loan(binaryseq,infoSeq);

        // clean up

        mgr.getSubscriber().delete_datareader(binaryReader);
        mgr.deleteSubscriber();
        mgr.deleteTopic();
        mgr.deleteParticipant();

    }

    private static void toWrtieXML(byte[] bytes) throws IOException {
        // TODO Auto-generated method stub
        File Writefile=new File("samplesubscriber.txt");
        if(!Writefile.exists()){
            randomAccessFile = new RandomAccessFile(Writefile, "rw");
            randomAccessFile.write(bytes, 0, bytes.length);
            randomAccessFile.close();
            }
            else{
                randomAccessFile = new RandomAccessFile(Writefile, "rw");
                long i=Writefile.length();
                randomAccessFile.seek(i);
                randomAccessFile.write(bytes, 0, bytes.length);
                randomAccessFile.close();
            }


    }
}

提前致谢

4

1 回答 1

1

很难对您的问题给出结论性的答案,因为您的问题可能是多种不同原因造成的。此外,一旦确定了问题的原因,您可能会有多种选择来缓解它。

首先要看的地方是读者方面。代码take()循环执行,每次拍摄之间有 200 毫秒的暂停。根据您在 DataReader 上的 QoS 设置,您可能会遇到这样的情况:当您的应用程序休眠 200 毫秒时,您的样本在 DataReader 中被覆盖。如果您通过千兆以太网执行此操作,那么典型的 DDS 产品将能够在该睡眠期间执行这 5 个 1 兆字节的块,这意味着您的默认单一缓冲区将在您的睡眠期间被覆盖 4 次。

如果您使用默认的历史 QoS 设置,则可能会出现这种情况BinaryFileDataReader,这意味着history.kind = KEEP_LASThistory.depth = 1。将后者增加到更大的值,例如增加到 20,将导致队列能够在您睡觉时保存 20 个文件块。现在应该足够了。

如果这不能解决您的问题,则可以探索其他可能的原因。

于 2012-07-16T03:31:10.147 回答