3

我正在使用 socket.recv() 通过套接字(蓝牙)从设备接收二进制数据。

我考虑过在列表或字节数组中进行缓冲,直到接收到足够的数据以进行解码和操作。也就是说,直到我收到一个开始和一个停止标志(字节)。

但是,该设备正在应用“八位字节填充”。也就是说,以下两个字节的每次出现都应替换为单个字节,如下所示:

dic = { '\xFE\xDC' : '\xFC' , '\xFE\xDD' : '\xFD' , '\xFE\xDE' : '\xFE'}

此外,在发送数据时,应采用相反的方法。例如,一个字节 0xFC -> 产生两个字节 0xFE、0xDC。

真正发生的是,当填充(发送数据)时,如果检测到 0xFC、0xFD、0xFE 之一,则在字节之前添加 0xFE,该字节本身就是 XORed 0x20。在 unstuffing(接收数据)时,0XFE 被丢弃,接下来的字节是 XORed 0x20。

说我对 Python 还很陌生,这是轻描淡写的。我昨天开始编码,有一个界面启动并运行。然而,这有点棘手。

我知道我可以将数据放入字符串并进行替换。但是要将二进制数据打包成一个字符串,替换然后解包和解码似乎效率有点低。

当我看到 0xFE 标志时,我还可以观察传入的数据并采取行动。最好的方法是填充/取消填充列表、字节数组或其他任何东西。

替换列表或字节数组中的单个字节似乎并不太难,但是用两个或相反的方式替换一个......?

非常感谢任何帮助。

(顺便说一下,这是 Python 2.7。)

4

1 回答 1

6

Overview

You need to wrap a bytestream and escape specific values. Also, the other way around is required: unescape control-codes and get the raw payload. You are working with sockets. The socket-commands uses string-parameters. In python, every string is basically a wrapper around a char*-array.

Naive approach

Its a string and we want to replace specific values with other ones. So what is the simplest way to achieve this?

def unstuff(self, s):
    return s.replace('\xFE\xDC', '\xFC').replace('\xFE\xDD', '\xFE').replace('\xFE\xDE', '\xFE')
def stuff(self, s):
    return s.replace('\xFC', '\xFE\xDC').replace('\xFD', '\xFE\xDD').replace('\xFE', '\xFE\xDE')

Seems to be bad. With every replace-call, a new string-copy will be created.

Iterator

A very pythonic approach is to define an iterator for this specific problem: define the iterator to transform the input-data into the desired output.

def unstuff(data):
    i = iter(data)
    dic = {'\xDC' : '\xFC', '\xDD' : '\xFD', '\xFE' : '\xDE'}
    while True:
        d = i.next()  # throws StopIteration on the end
        if d == '\xFE':
            d2 = i.next()
            if d2 in dic:
                yield dic[d2]
            else:
                yield '\xFE'
                yield d2
        else:
            yield d

def stuff(data):
    i = iter(data)
    dic = { '\xFC' : '\xDC', '\xFD' : '\xDD', '\xFE' : '\xDE' }
    while True: 
        d = i.next() # throws StopIteration on the end
        if d in dic:
            yield '\xFE'
            yield dic[d]
        else:
            yield d

def main():
    s = 'hello\xFE\xDCWorld'
    unstuffed = "".join(unstuff(s))
    stuffed = "".join(stuff(unstuffed))
    print s, unstuffed, stuffed

    # also possible
    for c in unstuff(s):
        print ord(c)

if __name__ == '__main__':
    main()

stuff() and unstuff() need something iterable (list, string, ...) and return an iterator-object. If you want to print the result or pass it into socket.send, you need to convert it back to a string (as shown with "".join()). Every unexpected data is handled somehow: 0xFE 0x__ will be returned verbatim, if it does not match any pattern.

RegExp

Another way would be to use regular expressions. Its a big topic and a source of trouble sometimes, but we can keep it simple:

import re

s = 'hello\xFE\xDCWorld' # our test-string

# read: FE DC or FE DD or FE DE
unstuff   = re.compile('\xFE\xDC|\xFE\xDD|\xFE\xDE')
# read: 
#  - use this pattern to match against the string
#  - replace what you have found (m.groups(0), whole match) with
#    char(ord(match[1])^0x20)
unstuffed = unstuff.sub(lambda m: chr(ord(m.group(0)[1])^0x20), s)

# same thing, other way around
stuff   = re.compile('\xFC|\xFD|\xFE')
stuffed = stuff.sub(lambda m: '\xFE' + chr(ord(m.group(0))^0x20), unstuffed)

print s, unstuffed, stuffed

As said, you must create the new string somewhere to be able to use it with sockets. At least, this approach do not create unnecessary copies of the string like s.replace(..).replace(..).replace(..) would. You should keep the patterns stuffand unstuff somewhere around as building these objects is relatively expensive.

native C-Function

If some things are going to slow in python, we might want to use cpython and implement it as oure C-code. Basically, I do a first run, count how many bytes I nerd, allocate a new string and do the second run. I'm not very used to python-c-extensions and so I do not want to share this code. It just seems to work, see the next chapter

Comparison

One of the most important rules of optimization: compare! The basic setup for every test:

generate random binary data as a string
while less_than_a_second:
    unstuff(stuff(random_data))
    count += 1
return time_needed / count

I know, the setup isn't optimal. But we should get some usable result:

Comparing 4 methods

What do we see? Native is the fastest way to go, but only for very small strings. This is probably because of the python-interpreter: only one function-call is needed instead of three. But microseconds is fast enough the most of the times. After ~500 bytes, the timings are nearly the same with the naive approach. There must be some deep magic happening down there in the implementation. Iterators and RegExp are unacceptable compared to the effort.

To sum things up: use the naive approach. Its hard to get something better. Also: if you simply guess about timings, you will be almost always wrong.

于 2012-11-23T14:09:07.143 回答