3

我们有一个已经在 TCP/IP 中实现的服务器,但我们现在需要该协议也支持 UDP。

发送的每个 UDP 数据报都包含我需要解码的所有内容,因此它是一个非常简单的回复和响应系统,数据报中的数据由换行符分隔。

服务器启动时的引导代码如下所示:

    //SETUP UDP SERVER
    DatagramChannelFactory udpFactory = new NioDatagramChannelFactory(Executors.newCachedThreadPool());

    ConnectionlessBootstrap udpBootstrap = new ConnectionlessBootstrap(udpFactory);

    udpBootstrap.setOption("sendBufferSize", 65536);
    udpBootstrap.setOption("receiveBufferSize", 65536);
    udpBootstrap.setOption("receiveBufferSizePredictorFactory", new AdaptiveReceiveBufferSizePredictorFactory());

    udpBootstrap.setOption("broadcast", "true");
    udpBootstrap.setPipelineFactory(new ServerPipeLineFactoryUDP());
    udpBootstrap.bind(new InetSocketAddress(hostIp, 4000)); 

管道代码是:

class ServerPipeLineFactoryUDP implements ChannelPipelineFactory
{

    private final static ExecutionHandler EXECUTION_HANDLER = new ExecutionHandler(new OrderedMemoryAwareThreadPoolExecutor(ScorpionFMS.THREAD_POOL_COUNT, 0, 0));

    public ServerPipeLineFactoryUDP()
    {

    }

    @Override
    public ChannelPipeline getPipeline() throws Exception
    {

    ChannelPipeline pipeline = pipeline();
    pipeline.addLast("debugup", new DebugUpstreamHandler("UDP"));
    pipeline.addLast("debugdown", new DebugDownstreamHandler("UDP"));

    pipeline.addLast("framer", new DelimiterBasedFrameDecoder(256, Delimiters.lineDelimiter()));

    pipeline.addLast("decoder", new UDPRequestDecoder(true));
    pipeline.addLast("encoder", new StringEncoder());
    pipeline.addLast("executor", EXECUTION_HANDLER);
    pipeline.addLast("handler", new UDPRequestHandler(

    return pipeline;
    }
}

我遇到的问题是每个数据报都使用该管道的相同实例(我希望每个数据报都使用管道的新实例),因此我在处理数据报内容时存储的所有状态都被保存,下一个数据报使用它也是如此,(而对于 TCP,每个连接都有自己的通道,因此有自己的管道实例和自己的状态)

我知道这是阅读文档的预期行为,但是无论如何强制netty为每个数据报重新创建管道?还是我以完全错误的方式解决这个问题?

简而言之,我希望每个数据报都有一个新的管道实例(与 tcp 相同)

4

2 回答 2

6

就像我在 IRC 中所说的那样,我认为这可以做你想做的,或者至少给你一些想法。

public class Example {

    public static void main(String[] args) {
        final ChannelPipelineHandlerImpl perDatagramFactory = new ChannelPipelineHandlerImpl();

        DatagramChannelFactory udpFactory = new NioDatagramChannelFactory(Executors.newCachedThreadPool());

        ConnectionlessBootstrap udpBootstrap = new ConnectionlessBootstrap(udpFactory);

        udpBootstrap.setPipelineFactory(new ChannelPipelineFactory() {

            public ChannelPipeline getPipeline() throws Exception {
                return Channels.pipeline(new DistinctChannelPipelineHandler(perDatagramFactory));
            }
        });

    }

    private static final class DistinctChannelPipelineHandler implements ChannelDownstreamHandler, ChannelUpstreamHandler {
        private ChannelPipelineFactory factory;

        public DistinctChannelPipelineHandler(ChannelPipelineFactory factory) {
            this.factory = factory;
        }

        public void handleUpstream(ChannelHandlerContext ctx, ChannelEvent e) throws Exception {
            ChannelPipeline pipeline = factory.getPipeline();
            pipeline.attach(ctx.getChannel(), ctx.getPipeline().getSink());
            pipeline.sendUpstream(e);

            ctx.sendUpstream(e);

        }

        public void handleDownstream(ChannelHandlerContext ctx, ChannelEvent e) throws Exception {
            ChannelPipeline pipeline = factory.getPipeline();
            pipeline.attach(ctx.getChannel(), ctx.getPipeline().getSink());
            pipeline.sendDownstream(e);

            ctx.sendDownstream(e);
        }

    }

    private static final class ChannelPipelineHandlerImpl implements ChannelPipelineFactory {

        public ChannelPipeline getPipeline() throws Exception {
            // Add your handlers here
            return Channels.pipeline();
        }

    }
}
于 2012-04-27T11:21:21.997 回答
0

我不确定如何处理 UDP 通道,但如果每个数据报的通道不同,您可以将状态存储在ChannelLocal中。

于 2012-04-26T16:25:34.137 回答