1

我有一个使用 Netty 3.6.6 的应用程序。我使用 netty 将随机数据包数据从客户端发送到服务器。

发件人使用此管道:

    bootstrap.setPipelineFactory(new ChannelPipelineFactory() {
            @Override
            public ChannelPipeline getPipeline() {
                return Channels.pipeline(
                        new LengthFieldPrepender(4) );
            }
    });

我用 ChannelBuffer 包装我的数据包数据,我希望处理程序预先添加 4 个字节长度,以便接收器可以知道数据包的开始和结束位置。

接收器使用:

        bootstrap.setPipelineFactory(new ChannelPipelineFactory() {
            public ChannelPipeline getPipeline() throws Exception {
                    return Channels.pipeline(
                            new NettyReceiveHandler(listener));
            }
        });


    public class NettyReceiveHandler extends LengthFieldBasedFrameDecoder {
        @Override
        protected Object decode(ChannelHandlerContext ctx, Channel channel,
                ChannelBuffer buffer) throws Exception {
            ChannelBuffer decodedBuffer = (ChannelBuffer) super.decode(ctx, channel, buffer);
            if(decodedBuffer == null)
            {
                return null; // not ready yet
            }
            listener.handleObject(decodedBuffer);   
            return null;    // no upstream
        }

        public NettyReceiveHandler(NettyRecvListener listener) {
            super(THREEMiB, 0, 4, 0, 4); 
            this.listener = listener;       
        }

        private static final Logger logger = Logger.getLogger(
                NettyReceiveHandler.class.getName());

        @Override
        public void exceptionCaught(ChannelHandlerContext ctx, ExceptionEvent e) {
            logger.severe("Unexpected exception from downstream." + e.getCause());
            e.getChannel().close();
        }

        private final NettyRecvListener listener;
        private static final int THREEMiB = 3*1024*1024;
    }

一切似乎都按预期工作。发送者一个接一个地发送随机大小的ChannelBuffers,接收者以相同的顺序接收相同的ChannelBuffers

现在我的问题是:

1.当我将一个大的ChannelBuffer写入通道时,它被分解为几个写入例如,我在测试的日志中看到了这个

WARNING: The pipeline contains no upstream handlers; discarding: [id: 0xa1b55c95, /127.0.0.1:52359 => localhost/127.0.0.1:9991] WRITTEN_AMOUNT: 131071

所以让我们假设我正在发送两个 1MiB 缓冲区(缓冲区 A 和缓冲区 B)并且每次写入大致分为 8 次写入 - 缓冲区 A1、缓冲区 A2、...和缓冲区 B1、缓冲区 B2、...如果缓冲区 A4 无法写入(由于太多加载或接收器错误地在回调中花费了太多时间),框架会中断吗?也就是说,bufferA 在接收端错误地由 bufferA1、bufferA2、bufferA3、bufferA5、bufferA6、bufferA7、bufferA8、bufferB1 组成。

有没有关于不破坏框架的保证。

2.是否总是在 NettyReceiveHandler::decode 中返回 null,因为我没有任何上游处理程序?

谢谢你的帮助。

4

1 回答 1

2

假设一个 TCP 连接,那么您的大型写入将不会被丢弃。Netty 会将数据排队,直到可以写入为止。Netty 可能会通过引发INTEREST_OPS事件来表示它的写入队列已满,但它不会阻止您将更多数据排队。

就个人而言,我会以不同的方式处理这个问题。我不会扩展 LengthFieldBasedFrameDecoder,而是创建一个带有 2 个处理程序的管道 - 一个标准 LengthFieldBasedFrameDecoder 和一个 NettyReceiveHandler。然后您所要做的就是覆盖 messageReceived 并使用 e.getMessage() (或任何您调用的 Message 事件参数)调用您的侦听器。调用 LengthFieldBasedFrameDecoder 的 decode 方法没有任何麻烦,或者担心是否需要返回 null。只处理消息。

于 2013-07-08T10:28:07.153 回答