0

我已经使用 Netty 3.3.1-Final 3 周了。我的协议有 3 个步骤,每个步骤都需要不同的FrameDecoder

  • 阅读论据
  • 传输一些数据
  • 数据管道相互关闭

我经历了很多我无法理解的“阻塞”问题。终于在我看来,阅读org.jboss.netty.example.portunification示例时,我在尝试动态更改我的 FrameDecoder时遇到了一些缓冲区问题:一个 FrameDecoder 的缓冲区(可能)在更改下一个时不是空的。 ..

有没有办法在 Netty 中轻松做到这一点?我必须更改我的协议吗?我需要编写一个大的 FrameDecoder 并管理一个状态吗?如果是这样,如何避免具有公共子部分的不同协议之间的代码重复(例如“读取参数”)?

今天我想到了一个FrameDecoderUnifier(下面的代码)的想法,目的是热添加和删除一些FrameDecoder,你怎么看?

谢谢你的帮助!

雷诺

----------- FrameDecoderUnifier 类 --------------

    /**
     * This FrameDecoder is able to forward the unused bytes from one decoder to the next one. It provides
     * a safe way to replace a FrameDecoder inside a Pipeline.
     * It is not safe to just add and remove FrameDecoder dynamically from a Pipeline because there is a risk
     * of unread bytes inside the buffer of the FrameDecoder you wan't to remove.
     */
    public class FrameDecoderUnifier extends FrameDecoder {

        private final Method frameDecoderDecodeMethod;
        volatile boolean skip = false;
        LastFrameEventHandler eventHandler;
        LinkedList<Entry> entries;
        Entry entry = null;

        public FrameDecoderUnifier(LastFrameEventHandler eventHandler) {
            this.eventHandler = eventHandler;
            this.entries = new LinkedList<Entry>();
            try {
                this.frameDecoderDecodeMethod = FrameDecoder.class.getMethod("decode", ChannelHandlerContext.class, Channel.class, ChannelBuffer.class);
            } catch (NoSuchMethodException ex) {
                throw new RuntimeException(ex);
            } catch (SecurityException ex) {
                throw new RuntimeException(ex);
            }
        }

        public void addLast(FrameDecoder decoder, LastFrameIdentifier identifier) {
            entries.addLast(new Entry(decoder, identifier));
        }

        private Object callDecode(FrameDecoder decoder, ChannelHandlerContext ctx, Channel channel, ChannelBuffer buffer) throws Exception {
            return frameDecoderDecodeMethod.invoke(decoder, ctx, channel, buffer);
        }

        @Override
        protected Object decode(ChannelHandlerContext ctx, Channel channel, ChannelBuffer buffer) throws Exception {
            if (entry == null && !entries.isEmpty()) {
                entry = entries.getFirst();
            }

            if (entry == null) {
                return buffer; //No framing, no decoding
            }

            //Perform the decode operation
            Object obj = callDecode(entry.getDecoder(), ctx, channel, buffer);

            if (obj != null && entry.getIdentifier().isLastFrame(obj)) {
                //Fire event
                eventHandler.lastObjectDecoded(entry.getDecoder(), obj);
                entry = null;
            }
            return obj;
        }

        /**
         * You can use this interface to take some action when the current decoder is changed for the next one.
         * This can be useful to change some upper Handler in the pipeline.
         */
        public interface LastFrameEventHandler {

            public void lastObjectDecoded(FrameDecoder decoder, Object obj);
        }

        public interface LastFrameIdentifier {

            /**
             * True if after this frame, we should disable this decoder.
             * @param obj
             * @return 
             */
            public abstract boolean isLastFrame(Object decodedObj);
        }

        private class Entry {

            FrameDecoder decoder;
            LastFrameIdentifier identifier;

            public Entry(FrameDecoder decoder, LastFrameIdentifier identifier) {
                this.decoder = decoder;
                this.identifier = identifier;
            }

            public FrameDecoder getDecoder() {
                return decoder;
            }

            public LastFrameIdentifier getIdentifier() {
                return identifier;
            }
        }
}
4

2 回答 2

2

我遇到过类似的问题,因为从管道中删除帧解码器似乎并不能阻止它被调用,并且没有一种明显的方法可以使解码器表现得好像它不在链中一样:Netty坚持 decode() 至少读取一个字节,因此您不能简单地返回传入的 ChannelBuffer,而返回 null 会停止处理传入数据,直到下一个数据包到达,从而停止协议解码过程。

首先: FrameDecoder的 Netty 3.7 文档实际上有一节“用管道中的另一个解码器替换解码器”。它说:

仅仅通过调用 ChannelPipeline#replace() 是不可能实现这一点的

相反,它建议通过返回一个包含解码的第一个数据包和接收到的其余数据的数组来传递数据。

return new Object[] { firstMessage, buf.readBytes(buf.readableBytes()) };

重要的是,在此之前必须启用“展开”,但是这部分很容易错过并且没有解释。我能找到的最好的线索是Netty issue 132,这显然导致了 FrameDecoders 上的“展开”标志。如果为真,解码器将以对下游处理程序透明的方式将此类数组解包为对象。窥视源代码似乎证实了这就是“展开”的意思。

其次:似乎有一种更简单的方法,因为该示例还显示了如何将数据沿管道向下传递而不改变。例如,在完成其工作后,我的同步数据包 FrameDecoder 设置了一个内部标志并将其自身从管道中移除,并正常返回解码后的对象。设置标志后的任何后续调用都只需像这样传递数据:

protected Object decode(ChannelHandlerContext ctx,
                        Channel channel, ChannelBuffer cbuf) throws Exception {

    // Close the door on more than one sync packet being decoded
    if (m_received) {
        // Pass on the data to the next handler in the pipeline.
        // Note we can't just return cbuf as-is, we must drain it
        // and return a new one.  Otherwise Netty will detect that
        // no bytes were read and throw an IllegalStateException.
        return cbuf.readBytes(cbuf.readableBytes());
    }

    // Handle the framing
    ChannelBuffer decoded = (ChannelBuffer) super.decode(ctx, channel, cbuf);
    if (decoded == null) {
        return null;
    }

    // Remove ourselves from the pipeline now
    ctx.getPipeline().remove(this);
    m_received = true;

    // Can we assume an array backed ChannelBuffer?
    // I have only hints that we can't, so let's copy the bytes out.
    byte[] sequence = new byte[magicSequence.length];
    decoded.readBytes(sequence);

    // We got the magic sequence?  Return the appropriate SyncMsg
    return new SyncMsg(Arrays.equals(sequence, magicSequence));
}

从 LengthFieldBasedFrameDecoder 派生的解码器保持在下游并处理所有后续数据帧。到目前为止,对我有用。

于 2013-11-06T16:42:56.140 回答
1

我认为,应该避免使用基于某些状态切换内部解码器动态添加/删除上层处理程序的帧解码器,因为

  • 难以理解/调试代码
  • 处理程序没有明确定义的职责(这就是您删除/添加处理程序的原因吗?一个处理程序应该处理一种或多种(相关)类型的协议消息,而不是许多处理程序相同类型的消息)
  • 理想情况下,帧解码器只提取协议帧,而不是根据状态解码帧(这里帧解码器可以有内部解码器链来解码帧并触发带有解码消息的 MessageEvent,上面的处理程序可以对解码消息做出反应)。

更新:在这里,我考虑了一个协议,其中每条消息都可以有一个唯一的标签/标识符,并清楚地标记消息的结尾(例如标签长度值帧格式)

于 2012-04-23T16:11:42.080 回答