3

我正在构建一个基于 python 的接口,用于通过 TCP 从仪器中提取数据。数据流以特定事件的形式出现,并且时间不稳定:我收到大量数据,然后是缓慢的时期。它们是小数据包,因此为简单起见,假设它们是完整的数据包。

这是我从套接字得到的行为:

  • 发送事件 #1:socket.recv 返回事件 #1
  • 发送事件 #2:socket.recv 返回事件 #2
  • 快速发送事件#3-50:socket.recv 仅返回事件#3-30(返回 27 次)
  • 缓慢发送事件 #51:socket 返回.recv 事件 #31
  • 缓慢发送事件 #52:socket 返回.recv 事件 #32

没有数据丢失。但显然某处的缓冲区已被填满,并且套接字现在正在返回旧数据。但是 recv 不应该一直返回直到缓冲区为空吗?相反,它只在收到新数据包时才返回,尽管已经建立了数据包缓冲区。诡异的!

这是代码的本质(这是用于非阻塞的,我也只使用 recv 完成了阻塞 - 结果相同)。为简单起见,我剥离了所有数据包重组的东西。我已经仔细追踪到了插座,所以我知道这不是罪魁祸首。

class mysocket:
    def __init__(self,ip,port):
        self.socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
        self.socket.connect((ip,port))
        self.keepConn = True
        self.socket.setblocking(0)
        threading.Thread(target = self.rcvThread).start()
        threading.Thread(target = self.parseThread).start()

    def rcvThread(self):
        while self.keepConn:
            readable,writable,inError = select([self.socket],[self.socket],[],.1)
            if readable:
               packet = self.socket.recv(4096)
               self.recvqueue.put_nowait(packet)
            try:
               xmitmsg = self.sendqueue.get_nowait()
            except Queue.Empty:
               pass
            else:
               if writable:
                   self.socket.send(xmitmsg)

    def parseThread(self,rest = .1):
        while self.keepConn:
            try:
                output = self.recvqueue.get_nowait()
                eventnumber = struct.unpack('<H',output[:2]
                print eventnumber
            except Queue.Empty:
                sleep(rest)

为什么我不能让套接字转储缓冲区中的所有数据?我永远赶不上!这个太奇葩了。有人有指点吗?

我是一个业余爱好者,但我真的在这个上做了我的功课,我完全感到困惑。

4

1 回答 1

3
packet = self.socket.recv(4096)
self.recvqueue.put_nowait(packet)

TCP is a stream-based protocol, not a message-based one. It doesn't preserve message boundaries. Meaning you can't expect to have one recv() call per message. If you send data in a burst, Nagle's algorithm will combine the data into one TCP packet.

Your code assumes that each recv() call returns one "packet", and the parse thread prints the first number from each "packet". But recv() doesn't return packets, it returns chunks of data from the TCP stream. These chunks can contain one message or multiple messages or even partial messages. There's no guarantee that the first two bytes are always event numbers.

Typically, reading data from a TCP connection involves calling recv() multiple times and storing the data you get in a buffer. Once you've received an entire message then you remove the appropriate number of bytes from the buffer and process them.

If you have variable-length messages then you need to keep track of message boundaries yourself. TCP doesn't do it for you like UDP does. That means adding a header containing the message length to the front of each message.

try:
   xmitmsg = self.sendqueue.get_nowait()
except Queue.Empty:
   pass
else:
   if writable:
       self.socket.send(xmitmsg)

On another note, it looks like this code has a bug. It removes messages from the sendqueue whether or not the socket is writable. If the socket's not writable it'll silently throw away messages.

于 2013-11-02T03:36:54.047 回答