0

我遇到了与以下描述相同的问题: https ://stackoverflow.com/questions/17196588/java-io-streamcorruptedexception-invalid-type-code-ac-client-server

但是,我看不到我是如何创建多个 ObjectOutputStream 的。我确信 OP 收到了正确的答案,并且我确信我正在以某种方式创建多个实例,但我不知道如何。

   public class Node {
    public static void main(String[] args)
    {

        File file = new File("hotwords.txt");
        AppendableObjectOutputStream  oos = null;
        OutputStream outStream = null;
        long fileSize = file.length();
        ArrayList<String> hotwords = new ArrayList<String>();
        try
        {
        BufferedReader br = new BufferedReader(new FileReader(file));
        String CurrentLine;
        while (( CurrentLine = br.readLine()) != null) {
            hotwords.add(CurrentLine);
            System.out.println("HOTWORD: " + CurrentLine);
            }
        br.close();
        }
        catch(Exception e) {
            e.printStackTrace();
            System.exit(0);

        }
        Socket s = null;
        try{
         s = new Socket("server", 8189);
         PrintWriter writer = new PrintWriter(s.getOutputStream(), true);
         writer.println("NODE");
        outStream = s.getOutputStream();
        oos = new AppendableObjectOutputStream(outStream); 
        oos.flush();


         }
         catch(Exception e)
         {
            e.printStackTrace();
            System.exit(1);

        }
        try{
        String os = System.getProperty("os.name").toLowerCase();
        File logs;
        if(os.matches("windows"))
        {
        logs = new File(".../logs");
        System.out.println("Opening windows directory");

        }
        else
        {
            logs = new File("...logs");
            System.out.println("Opening linux directory");
        }

        for( File f : logs.listFiles() )
        {
            if(f.getName().matches("machine.log"))
            //if(f.getName().matches(".*log$"))
            {
                System.out.println("FOUND LOG " + f);
                Runnable r  = new FileHandler(s, oos, f, hotwords, file, fileSize);
                Thread t = new Thread(r);
                t.start();
            }
        }
        }
        catch(Exception e)
        {
            e.printStackTrace();
        }

    }
}



    FileHandler.java /* This will create a thread for a log file to continuously read through it*/

import java.io.BufferedReader;
import java.io.File;
import java.io.FileNotFoundException;
import java.io.FileReader;
import java.io.IOException;
import java.io.ObjectOutputStream;
import java.io.OutputStream;
import java.net.Socket;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.LinkedHashSet;

public class FileHandler implements Runnable {

    Socket c;
    File file;
    ArrayList<String> hotwords;
    long hws;
    File hwf;
    AppendableObjectOutputStream oos;


    public FileHandler(Socket conn, AppendableObjectOutputStream oos , File f, ArrayList<String> h, File hotwordFile, long hotwordSize)
    {
        c=conn;
        file=f;
        hotwords = h;
        hws = hotwordSize;
        hwf=hotwordFile;
        this.oos = oos;
    }

    public void run()
    {

        System.out.println("FILEHANDLER:THREAD STARTED");
        String sCurrentLine;
        BufferedReader br = null;
        try {
        br = new BufferedReader( new FileReader(file) );
        }
        catch(FileNotFoundException e)
        {
            e.printStackTrace();
        }
        HashMap<String, LinkedHashSet<String> > temp = new HashMap<String, LinkedHashSet<String> >();
        temp.put("FILEMON", new LinkedHashSet<String>() );
        try {
        //OutputStream outStream = c.getOutputStream();
        //AppendableObjectOutputStream  oos = new AppendableObjectOutputStream(outStream);   moved to cache node so everyone share same output stream
        boolean test = true;
        while(test)
        {
            if(hwf.length() != hws)
            {
                hws = hwf.length();
                hotwords.clear();
                try
                {
                BufferedReader hbr = new BufferedReader(new FileReader(file));
                String CurrentLine;
                while (( CurrentLine = hbr.readLine()) != null) {
                    hotwords.add(CurrentLine);
                    System.out.println("HOTWORD: " + CurrentLine);
                    }
                hbr.close();
                }
                catch(Exception e) {
                    e.printStackTrace();
                    System.exit(0);

                }

            }
            while((sCurrentLine = br.readLine()) != null)
            {
                System.out.println(sCurrentLine);
                for( String h : hotwords)
                {

                    if( sCurrentLine.matches(h) )
                    {
                        System.out.println("FILEHANDLER:FOUND MATCHING LINE " + sCurrentLine);
                        temp.get("FILEMON").add(file.getName() + ": " + sCurrentLine);
                        break;
                    }
                }
            }

            if(!temp.get("FILEMON").isEmpty())
            {
                if(c.isConnected())
                { oos.writeObject(temp); oos.reset(); }
            System.out.println("NODE:PRINTED OBJECT: Size of FILEMON " + temp.get("FILEMON").size() + " with id: " + temp.toString());
            temp.get("FILEMON").clear();
            System.out.print("NODE:SIZE OF FILEMON AFTER CLEAR: "  + temp.get("FILEMON").size());

            }


        }
        br.close();
        }
        catch(IOException e)
        {
            e.printStackTrace();

        }
    }
}


    Hub.java /*This is a hub that runs on a seperate machine which recieves data from nodes*/       

   public class CacheMonitorHub {
    public static void main(String[] args)
    {
        Map<Socket, AppendableObjectOutputStream> clients = Collections.synchronizedMap(new HashMap<Socket, AppendableObjectOutputStream>());
        try
        {
            ServerSocket s = new ServerSocket(8189);
            while(true)
            {
                Socket incoming = s.accept();
                System.out.println("Spawning " + incoming);
                Runnable r = new ConnectionHandler(incoming, clients);
                Thread t = new Thread(r);
                t.start();
            }
        }
        catch (IOException e)
        {
            e.printStackTrace();
        }
    }
}





    Handler.java /*Lastly, this is responsible for publishing messages to clients*/
   import java.io.IOException;
import java.io.InputStream;
import java.io.ObjectInputStream;
import java.io.ObjectOutputStream;
import java.io.OutputStream;
import java.io.PrintWriter;
import java.net.Socket;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.LinkedHashSet;
import java.util.List;
import java.util.Map;
import java.util.Scanner;

public class ConnectionHandler implements Runnable {

    Map<Socket, AppendableObjectOutputStream> Sockets;
    Socket incoming;

    public ConnectionHandler(Socket socket, Map<Socket, AppendableObjectOutputStream> others)
    {
        incoming = socket;
        Sockets = others;

    }

    public void run()
    {

        InputStream inStream = null;
        OutputStream outStream = null;
        ObjectInputStream ois= null;
        AppendableObjectOutputStream oos =null;
        try{
        inStream = incoming.getInputStream();
        outStream = incoming.getOutputStream();


        }
        catch(IOException e)
        {
            e.printStackTrace();
        }
        System.out.println("Creating Scanner..");
        Scanner in = new Scanner(inStream);
        //PrintWriter out = new PrintWriter(outStream, true /* autoFlush */);
        String clientOrNode = "";

        clientOrNode = in.nextLine();
        System.out.println("HUB: " + clientOrNode);

        if(clientOrNode.equals("CLIENT"))
        {

            System.out.println("HUB:FOUND A CLIENT!");
            /*
            AppendableObjectOutputStream  oos = null;

            try{
            oos = new AppendableObjectOutputStream(outStream);

            }
            catch(IOException e)
            {
                e.printStackTrace();
                System.exit(0);
            }
            */
            try{
            oos = new AppendableObjectOutputStream(outStream);
            }
            catch(IOException e)
            {
                e.printStackTrace();
            }
            Sockets.put(incoming, oos);
        }
        else if ( clientOrNode.equals("NODE") )
        {
            try {
                ois = new ObjectInputStream(inStream);

            }
            catch(IOException e){
                e.printStackTrace();
            }
            System.out.println("HUB:FOUND A NODE!");
            System.out.println("HUB:ABOUT TO ENTER WHILE");
            while(1==1)
            {
                try{
                    System.out.println("HUB:IN WHILE LOOP ABOUT TO READ OBJECT");
                HashMap<String, LinkedHashSet<String>> temp =  null;

                try {
                temp = (HashMap<String, LinkedHashSet<String>>) ois.readObject();
                }
                catch(Exception e)
                {
                    e.printStackTrace();
                }

                System.out.println("HUB:OBJECT RECIEVED " + temp.toString());


                for(Socket s : Sockets.keySet())
                {   

                        System.out.println("HUB:WRITING OBJECT NOW TO " + s.toString());
                        try {
                        Sockets.get(s).writeObject(temp);
                        Sockets.get(s).reset();
                        }
                        catch(Exception e)
                        {
                            Sockets.remove(s);
                        }


                }
                System.out.println("PAST FOR LOOP!!");
                }
                catch(Exception e)
                {
                    e.printStackTrace();
                }
                try {

                Thread.sleep(200);
                }
                catch(Exception e)
                {
                    e.printStackTrace();
                }
            }
        }
    }
}

    AppendableObjectOutputStream /*Just tried adding this as seen on suggestion from other post but not helping*/

    import java.io.ObjectOutputStream;
    import java.io.OutputStream;
    import java.io.IOException;

    public class AppendableObjectOutputStream extends ObjectOutputStream {

      public AppendableObjectOutputStream(OutputStream out) throws IOException {

        super(out);

      }

      @Override
      protected void writeStreamHeader() throws IOException {
        // do not write a header, but reset:
        // this line added after another question
        // showed a problem with the original
        reset();
      }

    }

关于为什么我得到 java.io.StreamCorruptedException: invalid type code: AC 的任何想法?

4

1 回答 1

0

除非该FileHandler.run()方法是同步的,或者其中有内部同步,否则两者都不是真的,我看不出你怎么能期望它起作用。您正在从多个线程写入相同ObjectOutputStream的内容:您将获得数据交错。任何事情都可能发生在接收器上。

NB 测试isConnected()并没有完成任何有用的事情。您在创建它时确实连接了Socket,它,并且isConnected()会继续告诉您,即使在您关闭它之后也是如此。例如,它不会告诉您连接是否仍然存在。

于 2013-07-31T23:27:06.207 回答