大家好,有客户端服务器聊天,我尝试对其进行负载测试。我使用我的协议,它看起来像 XMPP。我发送 XML 并解析它。如果我启动服务器,对于某些用户来说它可以正常工作。但是我正在进行负载测试,并且正在启动很多用户并从每个用户发送消息。在测试中我没有创建新客户端,我只是用输出流实例化一个输出线程并使用它发送消息。服务器将收到的消息发送给所有用户,因此我创建了一个用户来监听其他用户。有时我会收到异常:软件导致连接中止:recv failed这是我的控制台:
06:55:49 Guest 9 (online) says : Hello Server. Message number^4
06:55:49 Guest 9 (online) says : Hello Server. Message number^5
06:55:49 Guest 11 (online) says : Hello Server. Message number^0
06:55:49 Guest 4 (online) says : Hello Server. Message number^6
ERROR ServerThread - Error in reading from stream: java.net.SocketException: Software caused connection abort: recv failed
ERROR ServerThread - Error in reading from stream: java.net.SocketException: Software caused connection abort: recv failed
06:55:49 Guest 9 (online) says : Hello Server. Message number^6
这是我的服务器线程。我跳过了它正在等待用户的部分。
public class ServerThread implements Runnable {
private static final Logger LOG = Logger.getLogger(ServerThread.class);
private XMLProtocol protocol;
private Socket socket;
private BufferedReader input;
private PrintWriter out;
private static Date date;
private String username;
private String status = "online";
private SimpleDateFormat dateFormat;
private String buffer = "";
private JAXBContext jaxbContext;
private Unmarshaller jaxbUnmarshaller;
public ServerThread(Socket s) throws SAXException, IOException, JAXBException {
input = new BufferedReader(new InputStreamReader(s.getInputStream()));
jaxbContext = JAXBContext.newInstance(XMLProtocol.class);
out = new PrintWriter(s.getOutputStream());
username = "Guest " + Server.getUserCounter();
dateFormat = new SimpleDateFormat("hh:mm:ss");
Server.addUser(username, out);
date = new Date();
socket = s;
new Thread(this);
}
public void run() {
try {
while (true) {
if (input.ready()) {
if (buffer.length() <= 256) {
if ((buffer = input.readLine()).toString().endsWith("</XMLProtocol>")) {
protocol = new XMLProtocol();
jaxbUnmarshaller = jaxbContext.createUnmarshaller();
protocol = (XMLProtocol) jaxbUnmarshaller.unmarshal(new StreamSource(new StringReader(buffer)));
switch (ChatCommands.valueOf(protocol.getStatus())) {
case LOGIN: {
Server.sendToAll(Server.buildResponce("User: " + this.username + " Has been changed nickname on "
+ protocol.getContent()));
this.username = protocol.getContent();
break;
}
case STATUS: {
Server.sendToAll(Server.buildResponce("The user: " + this.username + " Is now:" + protocol.getContent()));
this.status = protocol.getContent();
break;
}
case LOGOUT: {
Server.sendResponce(Server.buildResponce(ResponseCommands.DISCONNECT), out);
quit();
break;
}
default: {
LOG.trace("Getting message from user: " + username + " recived message: " + protocol.getContent());
date = Calendar.getInstance().getTime();
Server.sendToAll(Server.buildResponce(dateFormat.format(date.getTime()) + " " + username + " ("
+ this.status + ") " + " says : " + protocol.getContent()));
break;
}
}
}
} else {
Server.sendResponce(Server.buildResponce(ResponseCommands.SENDING_FAILED), out);
}
}
}
} catch (IOException e) {
LOG.error("Error in reading from stream: " + e);
} catch (JAXBException e) {
LOG.error("Error in Marshalling: " + e);
} finally {
try {
Server.sendResponce(Server.buildResponce(ResponseCommands.UNEXPECTED), out);
quit();
LOG.trace("Socket closed");
} catch (IOException | JAXBException e) {
LOG.error("Socket no closed" + e);
}
}
}
public void quit() throws IOException, JAXBException {
Server.sendToAll(Server.buildResponce("User: " + this.username + " quited"));
Server.removeUser(out);
socket.shutdownInput();
socket.shutdownOutput();
socket.close();
}
}
这是我的测试
public class ServerLoadTest {
private static ExecutorService exec = Executors.newFixedThreadPool(1000);
private static Socket s[] = new Socket[50];// = new Socket();
public static void main(String[] args) throws JAXBException, UnknownHostException, IOException, InterruptedException,
XMLStreamException, ParserConfigurationException, SAXException {
exec.execute(new TestServerThread()); // start Server thread
Thread.sleep(500); // wait till Server starts.
s[0] = new Socket("localhost", 4444);
exec.execute(new InputThread(s[0], new BufferedReader(new InputStreamReader(s[0].getInputStream())))); // Start
// one
for (int i = 0; i < 20; i++) {
exec.execute(new TestClientThread());
}
}
}
class TestClientThread implements Runnable {
private static final Logger LOG = Logger.getLogger(TestClientThread.class);
private XMLProtocol protocol;
private JAXBContext jaxbContext;
private Marshaller jaxbMarshaller;
private Socket socket;
private OutputStream outputStream;
public TestClientThread() throws JAXBException, UnknownHostException, IOException, InterruptedException, XMLStreamException,
ParserConfigurationException, SAXException {
jaxbContext = JAXBContext.newInstance(XMLProtocol.class);
jaxbMarshaller = jaxbContext.createMarshaller();
socket = new Socket("localhost", 4444);
protocol = new XMLProtocol();
outputStream = socket.getOutputStream();
new Thread(this);
}
@Override
public void run() {
try {
for (int i = 0; i < 10; i++) {
protocol.setContent("Hello Server. Message number^" + i);
protocol.setStatus(ChatCommands.MSG.getCommandCode());
jaxbMarshaller.marshal(protocol, outputStream);
}
protocol.setContent("Hello Server. Message number^");
protocol.setStatus(ChatCommands.LOGOUT.getCommandCode());
jaxbMarshaller.marshal(protocol, outputStream);
/* socket.shutdownInput();
socket.shutdownOutput();
socket.close();
Thread.currentThread().interrupt();*/
} catch (JAXBException e) {
LOG.trace("Error in marshaling ");
}
}
}
class TestServerThread implements Runnable {
private Server server;
public TestServerThread() {
new Thread(this);
}
@SuppressWarnings("static-access")
@Override
public void run() {
try {
server.main(null);
} catch (IOException | JAXBException | ParserConfigurationException | SAXException e) {
Assert.assertFalse(false);
}
}
}