1

我在我的一个应用程序中实现了一个 NettyDecoder

应用程序的协议很简单,前四个字符是消息长度,然后是消息。

帧解码器逻辑是

import java.nio.ByteBuffer;
import org.apache.commons.io.IOUtils;
import org.jboss.netty.buffer.ChannelBuffer;
import org.jboss.netty.channel.Channel;
import org.jboss.netty.channel.ChannelHandlerContext;
import org.jboss.netty.handler.codec.frame.FrameDecoder;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import sun.nio.cs.StandardCharsets;

public class ITMDecoder extends FrameDecoder {


public static String bytesToStringUTFCustom(byte[] bytes) {
     char[] buffer = new char[bytes.length >> 1];
     for(int i = 0; i < buffer.length; i++) {
      int bpos = i << 1;
      char c = (char)(((bytes[bpos]&0x00FF)<<8) + (bytes[bpos+1]&0x00FF));
      buffer[i] = c;
     }
     return new String(buffer);
    }

protected Object decode(ChannelHandlerContext ctx, Channel channel,
        ChannelBuffer buf) throws Exception {

     Logger logger = LoggerFactory.getLogger(ITMDecoder.class);

    // Make sure if the length field was received.
    if (buf.readableBytes() < 4) {
        // The length field was not received yet - return null.
        // This method will be invoked again when more packets are
        // received and appended to the buffer.
        return null;
    }


    // The length field is in the buffer.

    // Mark the current buffer position before reading the length field
    // because the whole frame might not be in the buffer yet.
    // We will reset the buffer position to the marked position if
    // there's not enough bytes in the buffer.
    buf.markReaderIndex();

    // Read the length field.

    byte[] twoBytesLength = new byte[4];

        for(int i = 0 ; i < 4 ; i++)
        twoBytesLength[i] = buf.getByte(i);

    String str = new String(twoBytesLength, "UTF-8");
    Short shortValue =      Short.parseShort(str);
    int length = shortValue.intValue() + 4;
    // Make sure if there's enough bytes in the buffer.
    if (buf.readableBytes() < length) {
        // The whole bytes were not received yet - return null.
        // This method will be invoked again when more packets are
        // received and appended to the buffer.

        // Reset to the marked position to read the length field again
        // next time.
        buf.resetReaderIndex();

        return null;
    }

    // There's enough bytes in the buffer. Read it.

    ChannelBuffer frame = buf.readBytes(length);

    // Successfully decoded a frame. Return the decoded frame.
    return frame;
}

}

通道管道逻辑是:

     ServerBootstrap bootstrap = new ServerBootstrap(
             new NioServerSocketChannelFactory(
         Executors.newCachedThreadPool(),
         Executors.newCachedThreadPool()));


     bootstrap.setPipelineFactory(new ChannelPipelineFactory() {
          public ChannelPipeline getPipeline() throws Exception {
           return Channels.pipeline(
            new ITMDecoder(),
            new M3AlertHandler()
           );
          };
         });

当交易量在 2 tps 左右时,它可以正常工作。但是,当以更高的 tps 发送事务时,帧解码器会损坏。

我用 Socket 工作台检查了同样的情况,其中包含一条 2 个可变长度的长消息

我用来发送到服务器的消息是消息 1 =00051234500041234

重复相同的 1000 次并在一秒钟内发送它,解码器在 5/6 条消息后损坏?

有什么我遗漏的东西实际上可以使它正常工作吗?

4

1 回答 1

3
for(int i = 0 ; i < 4 ; i++)
    twoBytesLength[i] = buf.getByte(i);

你永远不应该假设第一个字节的索引是0. 第一个字节的索引是buf.readerIndex()

for (int i = 0; i < 4; i ++) {
    twoBytesLength[i] = buf.getByte(buf.readerIndex() + i);
}

您可以进一步优化字节缓冲区访问,但这不是问题的范围。

于 2013-11-05T07:39:38.440 回答