1

首先,我知道关于这个确切的例外有很多问题,但似乎没有一个有很好的答案。

我有一个本地客户端网络,通过 TCP 套接字与服务器通信。他们正在从服务器接收大量数据。当我开始我的测试时,它运行了一段时间,但最终我在其中一个流上看到了这个异常。

java.io.StreamCorruptedException: invalid type code: 00
    at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1373)
    at java.io.ObjectInputStream.readObject(ObjectInputStream.java:368)
    at myProgram.connectionProtocol.waitForMsg(connectionProtocol.java:137)
    at myProgram.Client.ClientProcess$testController.run(ClientProcess.java:296)
    at myProgram.Client.ClientProcess.main(ClientProcess.java:71)

更新:

我尝试强制每个进程在继续之前等待确认消息,并在遇到此错误时发送重新发送。这增加了更多的复杂性,并且似乎导致的问题比它解决的问题更多。

我向所有输入添加了缓冲流(在代码中更新),听说这会提高 IO 性能,但没有运气。不过它并没有引起问题,所以我将它留在里面。

这是我的一些代码(已删除垃圾):

服务器:

private Socket client;
private String clientName = "?";
private Boolean threadActive = true;
private PublicKey clientKey;
private int timeout = 100;

ClientWorker(Socket sock) {
    client = sock;
}
public void run(){
    Object line;
    ObjectInputStream in = null;
    ObjectOutputStream out = null;

    client.setSoTimeout(timeout);
    in = new ObjectInputStream(new BufferedInputStream(client.getInputStream()));
    out = new ObjectOutputStream(new BufferedOutputStream(client.getOutputStream()));
    out.flush();

    while(threadActive){

        Message msgOut = msgQueue.get(0);
        if(msgOut != null && msgOut.source.equals(clientName)){
            msgQueue.remove(0);
            msgOut.source = "broker";
            out.writeObject(secure.signObject(msgOut,privKey));
            out.flush();
            out.reset();
        } 

        try{
            line = in.readObject();
            handleMessage((SignedObject) line);
        } catch(SocketTimeoutException e){
            //no data currently in stream
        }
    }
    client.close();
}

客户:

serverSock = new connectionProtocol(name,privKey);
socketNum = serverSock.getSocketNum("server");
serverSock.openSocket(socketNum);
serverSock.setSoTimeout(100);

while(isActive){
    // ERROR OCCURS IN NEXT LINE
    serverMsg = (SignedObject)serverSock.waitForMsg();
    if(serverMsg != null)
        handleMessage(brokerMsg);

    // messages are sent/received over other sockets here
}
serverSock.closeSocket();

周期性地,另一个线程也会调用 serverSock.sendMsg(Message)。

连接协议:

public void openSocket(int sockNum) throws IOException {
    socket = new Socket("localhost", sockNum);
    out = new ObjectOutputStream(socket.getOutputStream());
    in = new ObjectInputStream(socket.getInputStream());
    out.flush();


    Message initMessage = new Message(name,"open");
    SignedObject sigMessage = security.signObject(initMessage, privKey);
    out.writeObject(sigMessage);
    out.flush();
    out.reset();

    socketActive = true;
}

public synchronized void closeSocket(){
    socketActive = false;
    Message closeMessage = new Message(name,"end");
    out.writeObject(security.signObject(closeMessage,privKey));
    out.flush();
    socket.close();
}

public synchronized Object waitForMsg() throws IOException {
    Object msg = null;
    try {
        // ERROR OCCURS ON NEXT LINE
        msg = in.readObject();
        return msg;
    } catch (SocketTimeoutException e) {
        return null;
    }catch (ClassNotFoundException e) {
        e.printStackTrace();
    }
    return msg;
}

public synchronized void sendMsg(Message msg){
    out.writeObject(security.signObject(msg, privKey));
    out.flush();
    out.reset();
}
4

0 回答 0