4

我们正在用 Java (1.6) 开发一个服务器应用程序,它是一个事务服务器,它通过 TCP 套接字侦听连接。每个新连接都会创建一个新线程,该线程在连接关闭之前一直保持活动状态。每个客户端都会将事务发送到将要处理的服务器,然后将响应发送回客户端。

这工作正常。当我们想通过同一个套接字发送许多异步事务(或消息)时,问题就出现了。我编写了一个小型应用程序,它发送 1000 个事务,每个事务之间的间隔为 10 毫秒。该应用程序是异步的,因此发送消息并在中间进行响应。

这是处理传入消息并将它们发送到另一个组件进行处理的部分的代码(该组件有一个线程池):

public void run() {
...
...

  socketBuf = new BufferedInputStream(input);
  baos = new ByteArrayOutputStream();

  while ((bytes_read = socketBuf.read(buffer)) != -1) {

    if (bytes_read < 0) {
        log.error("Tried to read from socket, read() returned < 0,  Closing socket.");
        return;
    }

    baos.write(buffer, 0, bytes_read);
    break;
  }

  if (bytes_read >= 0) {

    baos.flush();
    byte data[] = baos.toByteArray();

    if (data.length > 0) {                      
        GWTranData tData = posMessage.decode(data, false);   
        if (tData.getMessageType() > 0) {

            // Send to the Pre-Online Manager to be processed                       
            PreOnlineJob newJob = new PreOnlineJob(tData);
            newJob.addJobStatusListener(this);
            GWServer.getPreOnlineInstance().addJob(newJob);
        }
    }
  }
  else {
    clientSocket.close();
    break;
  }

} while(true);
  } 

在短时间内发送许多事务时,我们面临的问题是某些消息丢失并且无法到达服务器。通过深入分析,我们发现当消息发送速度过快时,缓冲区中的消息不止一条,因此 data[] 有两条或多条消息但只会执行一条。发送的消息大小为 200 字节,因此 512 的缓冲区绰绰有余。

我实现套接字读取的方式有什么问题吗?有没有更好的办法?

多谢你们。

4

2 回答 2

7

问题在于您使用从套接字读取的字节的方式。您的假设是每次阅读都会收到一条“消息”。这个假设是错误的 - TCP 不知道您的应用程序消息边界,但会为您提供字节,因此您可以一次获取多条消息,或消息的一部分,或两者兼而有之。

您必须缓冲接收到的流中未处理的部分,检查是否收到完整的消息,再阅读一些内容,直到完成,处理消息,然后循环继续。

编辑0:

有几种方法可以在 TCP 之上设计应用程序级协议:

  • 固定长度的消息(简单),
  • 分隔消息(需要明确的字节序列来指定消息的结束/开始,如SOH在 FIX 或\r\nHTTP 中),
  • @Thomas 在评论中建议的长度前缀消息,
  • “自我描述”的消息——比如 s 表达式或其他什么,需要被解析,
  • 也许其他人。
于 2012-08-16T14:53:18.890 回答
1

您需要调整代码以在消息到达时对其进行处理。目前,您正在将从 connect 到 EOS 的整个流累积到一个巨大的字节数组中,然后像只包含一条消息一样处理它。从“丢失”消息的角度来看,这不仅是错误的(是你在丢失),而且非常浪费时间和空间。您无需等待 EOS 即可处理第一条消息。你需要弄清楚如何读取流,直到你有一条消息,处理它,然后重复处理下一条消息,终止于 EOS。

于 2012-08-16T23:59:12.593 回答