我在我的一个应用程序中实现了一个 NettyDecoder
应用程序的协议很简单,前四个字符是消息长度,然后是消息。
帧解码器逻辑是
import java.nio.ByteBuffer;
import org.apache.commons.io.IOUtils;
import org.jboss.netty.buffer.ChannelBuffer;
import org.jboss.netty.channel.Channel;
import org.jboss.netty.channel.ChannelHandlerContext;
import org.jboss.netty.handler.codec.frame.FrameDecoder;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import sun.nio.cs.StandardCharsets;
public class ITMDecoder extends FrameDecoder {
public static String bytesToStringUTFCustom(byte[] bytes) {
char[] buffer = new char[bytes.length >> 1];
for(int i = 0; i < buffer.length; i++) {
int bpos = i << 1;
char c = (char)(((bytes[bpos]&0x00FF)<<8) + (bytes[bpos+1]&0x00FF));
buffer[i] = c;
}
return new String(buffer);
}
protected Object decode(ChannelHandlerContext ctx, Channel channel,
ChannelBuffer buf) throws Exception {
Logger logger = LoggerFactory.getLogger(ITMDecoder.class);
// Make sure if the length field was received.
if (buf.readableBytes() < 4) {
// The length field was not received yet - return null.
// This method will be invoked again when more packets are
// received and appended to the buffer.
return null;
}
// The length field is in the buffer.
// Mark the current buffer position before reading the length field
// because the whole frame might not be in the buffer yet.
// We will reset the buffer position to the marked position if
// there's not enough bytes in the buffer.
buf.markReaderIndex();
// Read the length field.
byte[] twoBytesLength = new byte[4];
for(int i = 0 ; i < 4 ; i++)
twoBytesLength[i] = buf.getByte(i);
String str = new String(twoBytesLength, "UTF-8");
Short shortValue = Short.parseShort(str);
int length = shortValue.intValue() + 4;
// Make sure if there's enough bytes in the buffer.
if (buf.readableBytes() < length) {
// The whole bytes were not received yet - return null.
// This method will be invoked again when more packets are
// received and appended to the buffer.
// Reset to the marked position to read the length field again
// next time.
buf.resetReaderIndex();
return null;
}
// There's enough bytes in the buffer. Read it.
ChannelBuffer frame = buf.readBytes(length);
// Successfully decoded a frame. Return the decoded frame.
return frame;
}
}
通道管道逻辑是:
ServerBootstrap bootstrap = new ServerBootstrap(
new NioServerSocketChannelFactory(
Executors.newCachedThreadPool(),
Executors.newCachedThreadPool()));
bootstrap.setPipelineFactory(new ChannelPipelineFactory() {
public ChannelPipeline getPipeline() throws Exception {
return Channels.pipeline(
new ITMDecoder(),
new M3AlertHandler()
);
};
});
当交易量在 2 tps 左右时,它可以正常工作。但是,当以更高的 tps 发送事务时,帧解码器会损坏。
我用 Socket 工作台检查了同样的情况,其中包含一条 2 个可变长度的长消息
我用来发送到服务器的消息是消息 1 =00051234500041234
重复相同的 1000 次并在一秒钟内发送它,解码器在 5/6 条消息后损坏?
有什么我遗漏的东西实际上可以使它正常工作吗?