我已经使用 Netty 3.3.1-Final 3 周了。我的协议有 3 个步骤,每个步骤都需要不同的FrameDecoder

  • 阅读论据
  • 传输一些数据
  • 数据管道相互关闭

我经历了很多我无法理解的“阻塞”问题。终于在我看来,阅读org.jboss.netty.example.portunification示例时,我在尝试动态更改我的 FrameDecoder时遇到了一些缓冲区问题:一个 FrameDecoder 的缓冲区(可能)在更改下一个时不是空的。 ..

有没有办法在 Netty 中轻松做到这一点?我必须更改我的协议吗?我需要编写一个大的 FrameDecoder 并管理一个状态吗?如果是这样,如何避免具有公共子部分的不同协议之间的代码重复(例如“读取参数”)?




----------- FrameDecoderUnifier 类 --------------

     * This FrameDecoder is able to forward the unused bytes from one decoder to the next one. It provides
     * a safe way to replace a FrameDecoder inside a Pipeline.
     * It is not safe to just add and remove FrameDecoder dynamically from a Pipeline because there is a risk
     * of unread bytes inside the buffer of the FrameDecoder you wan't to remove.
    public class FrameDecoderUnifier extends FrameDecoder {

        private final Method frameDecoderDecodeMethod;
        volatile boolean skip = false;
        LastFrameEventHandler eventHandler;
        LinkedList<Entry> entries;
        Entry entry = null;

        public FrameDecoderUnifier(LastFrameEventHandler eventHandler) {
            this.eventHandler = eventHandler;
            this.entries = new LinkedList<Entry>();
            try {
                this.frameDecoderDecodeMethod = FrameDecoder.class.getMethod("decode", ChannelHandlerContext.class, Channel.class, ChannelBuffer.class);
            } catch (NoSuchMethodException ex) {
                throw new RuntimeException(ex);
            } catch (SecurityException ex) {
                throw new RuntimeException(ex);

        public void addLast(FrameDecoder decoder, LastFrameIdentifier identifier) {
            entries.addLast(new Entry(decoder, identifier));

        private Object callDecode(FrameDecoder decoder, ChannelHandlerContext ctx, Channel channel, ChannelBuffer buffer) throws Exception {
            return frameDecoderDecodeMethod.invoke(decoder, ctx, channel, buffer);

        protected Object decode(ChannelHandlerContext ctx, Channel channel, ChannelBuffer buffer) throws Exception {
            if (entry == null && !entries.isEmpty()) {
                entry = entries.getFirst();

            if (entry == null) {
                return buffer; //No framing, no decoding

            //Perform the decode operation
            Object obj = callDecode(entry.getDecoder(), ctx, channel, buffer);

            if (obj != null && entry.getIdentifier().isLastFrame(obj)) {
                //Fire event
                eventHandler.lastObjectDecoded(entry.getDecoder(), obj);
                entry = null;
            return obj;

         * You can use this interface to take some action when the current decoder is changed for the next one.
         * This can be useful to change some upper Handler in the pipeline.
        public interface LastFrameEventHandler {

            public void lastObjectDecoded(FrameDecoder decoder, Object obj);

        public interface LastFrameIdentifier {

             * True if after this frame, we should disable this decoder.
             * @param obj
             * @return 
            public abstract boolean isLastFrame(Object decodedObj);

        private class Entry {

            FrameDecoder decoder;
            LastFrameIdentifier identifier;

            public Entry(FrameDecoder decoder, LastFrameIdentifier identifier) {
                this.decoder = decoder;
                this.identifier = identifier;

            public FrameDecoder getDecoder() {
                return decoder;

            public LastFrameIdentifier getIdentifier() {
                return identifier;

2 回答 2


我遇到过类似的问题,因为从管道中删除帧解码器似乎并不能阻止它被调用,并且没有一种明显的方法可以使解码器表现得好像它不在链中一样:Netty坚持 decode() 至少读取一个字节,因此您不能简单地返回传入的 ChannelBuffer,而返回 null 会停止处理传入数据,直到下一个数据包到达,从而停止协议解码过程。

首先: FrameDecoder的 Netty 3.7 文档实际上有一节“用管道中的另一个解码器替换解码器”。它说:

仅仅通过调用 ChannelPipeline#replace() 是不可能实现这一点的


return new Object[] { firstMessage, buf.readBytes(buf.readableBytes()) };

重要的是,在此之前必须启用“展开”,但是这部分很容易错过并且没有解释。我能找到的最好的线索是Netty issue 132,这显然导致了 FrameDecoders 上的“展开”标志。如果为真,解码器将以对下游处理程序透明的方式将此类数组解包为对象。窥视源代码似乎证实了这就是“展开”的意思。

其次:似乎有一种更简单的方法,因为该示例还显示了如何将数据沿管道向下传递而不改变。例如,在完成其工作后,我的同步数据包 FrameDecoder 设置了一个内部标志并将其自身从管道中移除,并正常返回解码后的对象。设置标志后的任何后续调用都只需像这样传递数据:

protected Object decode(ChannelHandlerContext ctx,
                        Channel channel, ChannelBuffer cbuf) throws Exception {

    // Close the door on more than one sync packet being decoded
    if (m_received) {
        // Pass on the data to the next handler in the pipeline.
        // Note we can't just return cbuf as-is, we must drain it
        // and return a new one.  Otherwise Netty will detect that
        // no bytes were read and throw an IllegalStateException.
        return cbuf.readBytes(cbuf.readableBytes());

    // Handle the framing
    ChannelBuffer decoded = (ChannelBuffer) super.decode(ctx, channel, cbuf);
    if (decoded == null) {
        return null;

    // Remove ourselves from the pipeline now
    m_received = true;

    // Can we assume an array backed ChannelBuffer?
    // I have only hints that we can't, so let's copy the bytes out.
    byte[] sequence = new byte[magicSequence.length];

    // We got the magic sequence?  Return the appropriate SyncMsg
    return new SyncMsg(Arrays.equals(sequence, magicSequence));

从 LengthFieldBasedFrameDecoder 派生的解码器保持在下游并处理所有后续数据帧。到目前为止,对我有用。

于 2013-11-06T16:42:56.140 回答


  • 难以理解/调试代码
  • 处理程序没有明确定义的职责(这就是您删除/添加处理程序的原因吗?一个处理程序应该处理一种或多种(相关)类型的协议消息,而不是许多处理程序相同类型的消息)
  • 理想情况下,帧解码器只提取协议帧,而不是根据状态解码帧(这里帧解码器可以有内部解码器链来解码帧并触发带有解码消息的 MessageEvent,上面的处理程序可以对解码消息做出反应)。


于 2012-04-23T16:11:42.080 回答