我有一个非常简单的解码器,它扩展了 FrameDecoder。
它将解码一个简单的文本协议(总是可变长度)并读取缓冲区,直到它找到一个'\r',然后将返回 ChannelBuffer(包含 1 条解码消息)。
示例文本协议/消息:
xxxx,yyyy,zzzz\r
我总是像这样将解码器与字符串解码器结合起来:
p.addLast("stdDecoder", new SimpleTextDecoder()); //extends FrameDecoder
p.addLast("stringDecoder", new StringDecoder(CharsetUtil.UTF_8));
一切正常,直到我达到更高的 tps(+/- 200tps),然后 ChannelBuffer 偶尔开始返回缓冲区,如下所示:
>xxxx,yyyxxxx,yyyy,zzzz\r
或者:
>y,zzzz\r 等
解码器:
public static final char END_MESSAGE = 13;
if (buffer.readableBytes() < 6) {
return null;
}
buffer.markReaderIndex();
if (1 <= buffer.readableBytes()) {
byte bite = buffer.readByte();
if (bite != END_MESSAGE) {
/* Create a new ChannelBuffer which is used for the cumulation. */
ChannelBuffer frameBuffer = buffer.factory().getBuffer(buffer.capacity());
/* Write byte */
frameBuffer.writeByte(bite);
while (buffer.readable()) {
/* read next byte */
bite = buffer.readByte();
if (bite != END_MESSAGE) {
frameBuffer.writeByte(bite);
} else {
/* Write end message and return buffer */
frameBuffer.writeByte(bite);
return frameBuffer;
}
}
} else {
return null;
}
}
/* not readable bytes return null*/
return null;
}
编辑:好的新版本,现在正在工作..如果可能的话,希望得到一些反馈:)
public class xxxxDecoder extends FrameDecoder {
private static final Logger log = LoggerFactory.getLogger(xxxxDecoder.class);
public static final char END_MESSAGE = 13;
public static final char START_MESSAGE = 83;
//Decodes the received packets so far into a frame.
@Override
protected Object decode(ChannelHandlerContext ctx,
Channel channel,
ChannelBuffer buffer) throws Exception {
/* Shortest known xxx message\r
* 8 bytes
*/
if (buffer.readableBytes() < 8) {
if (Consts.DEBUG_ENABLED) {
log.debug("YAWN... data Smaller than 8 bytes,waiting for more," + buffer.readableBytes());
}
/*If null is returned, it means there's not enough data yet.*/
buffer.resetReaderIndex();
return null;
} else {
log.debug("Readable bytes=" + buffer.readableBytes() + ",buffersize=" + buffer.capacity());
}
/* Mark the "end, i think?" of this buffer */
buffer.markReaderIndex();
if (1 <= buffer.readableBytes()) {
byte bite = buffer.readByte();
if (bite != END_MESSAGE) {
if (Consts.DEBUG_ENABLED) {
log.debug("Yipee...Found new message," + bite + ",buffersize=" + buffer.capacity());
}
/* Create a new ChannelBuffer which is used for the cumulation. */
ChannelBuffer frameBuffer = buffer.factory().getBuffer(buffer.capacity());
/* Write byte */
frameBuffer.writeByte(bite);
while (buffer.readable()) {
/* read next byte */
bite = buffer.readByte();
if (bite != END_MESSAGE) {
frameBuffer.writeByte(bite);
} else {
/* Write end message and return buffer */
frameBuffer.writeByte(bite);
if (Consts.DEBUG_ENABLED) {
log.debug("Aha...Found end message=" + bite + ", returning complete message");
}
return frameBuffer;
}
}
} else {
/*If null is returned, it means there's not enough data yet.*/
buffer.resetReaderIndex();
return null;
}
}
/* not readable bytes return null*/
buffer.resetReaderIndex();
return null;
}