2

我有一个非常简单的解码器,它扩展了 FrameDecoder。

它将解码一个简单的文本协议(总是可变长度)并读取缓冲区,直到它找到一个'\r',然后将返回 ChannelBuffer(包含 1 条解码消息)。

示例文本协议/消息:

xxxx,yyyy,zzzz\r

我总是像这样将解码器与字符串解码器结合起来:

p.addLast("stdDecoder", new SimpleTextDecoder()); //extends FrameDecoder

p.addLast("stringDecoder", new StringDecoder(CharsetUtil.UTF_8));

一切正常,直到我达到更高的 tps(+/- 200tps),然后 ChannelBuffer 偶尔开始返回缓冲区,如下所示:

>xxxx,yyyxxxx,yyyy,zzzz\r

或者:

>y,zzzz\r

解码器:

   public static final char END_MESSAGE = 13;

   if (buffer.readableBytes() < 6) {
          return null;
   }
   buffer.markReaderIndex();
   if (1 <= buffer.readableBytes()) {
        byte bite = buffer.readByte();
        if (bite != END_MESSAGE) {
           /* Create a new ChannelBuffer which is used for the cumulation. */
           ChannelBuffer frameBuffer = buffer.factory().getBuffer(buffer.capacity());

           /* Write  byte */
           frameBuffer.writeByte(bite);

           while (buffer.readable()) {
               /* read next byte */
               bite = buffer.readByte();
               if (bite != END_MESSAGE) {
                   frameBuffer.writeByte(bite);
               } else {
                   /* Write end message and return buffer */
                   frameBuffer.writeByte(bite);
                  return frameBuffer;
               }
           }
       } else {
           return null;
       }
    }

    /* not readable bytes return null*/
    return null;
}

编辑:好的新版本,现在正在工作..如果可能的话,希望得到一些反馈:)

public class xxxxDecoder extends FrameDecoder {
    private static final Logger log = LoggerFactory.getLogger(xxxxDecoder.class);
    public static final char END_MESSAGE = 13;
    public static final char START_MESSAGE = 83;

    //Decodes the received packets so far into a frame.
    @Override
    protected Object decode(ChannelHandlerContext ctx,
                            Channel channel,
                            ChannelBuffer buffer) throws Exception {

        /* Shortest known xxx message\r
         * 8 bytes
         */
        if (buffer.readableBytes() < 8) {
            if (Consts.DEBUG_ENABLED) {
                log.debug("YAWN... data Smaller than 8 bytes,waiting for more," + buffer.readableBytes());
            }
            /*If null is returned, it means there's not enough data yet.*/
            buffer.resetReaderIndex();
            return null;
        } else {
            log.debug("Readable bytes=" + buffer.readableBytes() + ",buffersize=" + buffer.capacity());
        }


        /* Mark the "end, i think?" of this buffer  */
        buffer.markReaderIndex();

        if (1 <= buffer.readableBytes()) {

            byte bite = buffer.readByte();
            if (bite != END_MESSAGE) {
                if (Consts.DEBUG_ENABLED) {
                    log.debug("Yipee...Found new message," + bite + ",buffersize=" + buffer.capacity());
                }

                /* Create a new ChannelBuffer which is used for the cumulation. */
                ChannelBuffer frameBuffer = buffer.factory().getBuffer(buffer.capacity());

                /* Write  byte */
                frameBuffer.writeByte(bite);

                while (buffer.readable()) {
                    /* read next byte */
                    bite = buffer.readByte();
                    if (bite != END_MESSAGE) {
                        frameBuffer.writeByte(bite);
                    } else {
                        /* Write end message and return buffer */
                        frameBuffer.writeByte(bite);
                        if (Consts.DEBUG_ENABLED) {
                            log.debug("Aha...Found end message=" + bite + ", returning complete message");
                        }
                        return frameBuffer;
                    }
                }
            } else {
                /*If null is returned, it means there's not enough data yet.*/
                buffer.resetReaderIndex();
                return null;
            }
        }

        /* not readable bytes return null*/
        buffer.resetReaderIndex();
        return null;
    }
4

0 回答 0