我试图将扭曲的力量与声明性二进制数据解析器Protocol
的延展性混合起来。construct
到目前为止,我的MessageReceiver
协议通过以下方式累积来自 tcp 通道的数据:
def rawDataReceived(self, data):
'''
This method bufferizes the data coming from the TCP channel in the following way:
- Initially, discard the stream until a reserved character is detected
- add data to the buffer up to the expected message length unless the reserved character is met again. In that case discard the message and start again
- if the expected message length is reached, attempt to parse the message and clear the buffer
'''
if self._buffer:
index = data.find(self.reserved_character)
if index > -1:
if len(self._buffer) + index >= self._fixed_size:
self.on_message(self._buffer + data[:data.index(self._reserved_character)])
self._buffer = b''
data = data[data.index(self.reserved_character):]
[self.on_message(chunks[:self._fixed_size]) for chunks in [self.reserved_character + msg for msg in data.split(self._reserved_character) if msg]]
elif len(self._buffer) + len(data) < self._expected_size:
self._buffer = self._buffer + data
else:
self._buffer = b''
else:
try:
data = data[data.index(self._reserved_character):]
[self.on_message(chunks[:self._fixed_size]) for chunks in [self._reserved_character + msg for msg in data.split(self._reserved_character) if msg]]
except Exception, exc:
log.msg("Warning: Maybe there is no delimiter {delim} for the new message. Error: {err}".format(delim=self._reserved_character, err=str(exc)))
现在我需要改进协议以考虑消息可能携带或不携带可选字段的事实(因此不再有固定的消息长度)。construct
我用以下方式对消息解析器建模(一个有意义的部分) :
def on_message(self, msg):
return Struct(HEADER,
Bytes(HEADER_RAW, 3),
BitStruct(OPTIONAL_HEADER_STRUCT,
Nibble(APPLICATION_SELECTOR),
Flag(OPTIONAL_HEADER_FLAG),
Padding(3)
),
If(lambda ctx: ctx.optional_header_struct[OPTIONAL_HEADER_FLAG],
Embed(Struct(None,
Byte(BATTERY_CHARGE),
Bytes(OPTIONAL_HEADER, 3)
)
)
)
).parse(msg)
所以现在我需要更改缓冲逻辑以将正确的块大小传递给Struct
. 考虑到对象中已知消息的可能候选对象的规则,我想避免调整要传递给方法的数据的Struct
大小。rawDataReceived
construct
有没有办法将缓冲逻辑推送到construct
对象?
编辑
Macro
通过简单地使用s 和s ,我能够部分地实现将缓冲逻辑推入内部的目标Adapter
:
MY_PROTOCOL = Struct("whatever",
Anchor("begin"),
RepeatUntil(lambda obj, ctx:obj==RESERVED_CHAR, Field("garbage", 1)),
NoneOf(Embed(HEADER_SECTION), [RESERVED_CHAR]),
Anchor("end"),
Value("size", lambda ctx:ctx.end - ctx.begin)
)
rawDataReceived
这极大地简化了调用者代码(由于 Glyph 的建议不再使用):
def dataReceived(self, data):
log.msg('Received data: {}'.format(bytes_to_hex(data)))
self._buffer += data
try:
container = My_PROTOCOL.parse(self._buffer)
self._buffer = self._buffer[container.size:]
d, self.d = self.d, self._create_new_transmission_deferred()
d.callback(container)
except ValidationError, err:
self._cb_error("A validation error occurred. Discarding the rest of the message. {}".format(err))
self._buffer = b''
except FieldError, err: #Incomplete message. We simply keep on buffering and retry
if len(self._buffer) >= MyMessageReceiver.MAX_GARBAGE_SIZE:
self._cb_error("Buffer overflown. No delimiter found in the stream")
不幸的是,该解决方案仅部分满足了要求,因为我无法找到一种方法来construct
告诉我产生错误的流的索引,因此我不得不删除整个缓冲区,这并不理想。