1

我正在编写一个简单的路由应用程序。这个想法是我有服务器或源节点接收持续 x 时间的瞬时客户端连接。接收到的消息被解码,然后根据消息的详细信息发送到相应的接收节点或已经打开的客户端。Router 类注册所有通道并尝试将它们保存在映射中,以便它可以过滤和输出消息的目的地。一旦我得到目的地,我应该能够选择实际的接收节点(根据配置可能是持久性的瞬态)并将数据发送到该通道等待响应,然后将其发送回发起者。我' 想先知道我使用 netty 的实现是否朝着正确的方向发展?以及如何传递从任何服务器接收到的消息并将其发送到任何客户端并回复原始源节点?

下面是我的源代码:它将/应该让您了解我在做什么:请在您的解释中使用代码示例。

    import java.net.InetSocketAddress;
    import java.util.ArrayList;
    import java.util.HashMap;
    import java.util.List;
    import java.util.Map;
    import java.util.concurrent.Executors;
    import org.jboss.netty.bootstrap.ClientBootstrap;
    import org.jboss.netty.bootstrap.ServerBootstrap;
    import org.jboss.netty.channel.ChannelFactory;
    import org.jboss.netty.channel.ChannelHandlerContext;
    import org.jboss.netty.channel.ChannelPipeline;
    import org.jboss.netty.channel.ChannelPipelineFactory;
    import org.jboss.netty.channel.ChannelStateEvent;
    import org.jboss.netty.channel.Channels;
    import org.jboss.netty.channel.ChildChannelStateEvent;
    import org.jboss.netty.channel.ExceptionEvent;
    import org.jboss.netty.channel.MessageEvent;
    import org.jboss.netty.channel.SimpleChannelHandler;
    import org.jboss.netty.channel.socket.nio.NioClientSocketChannelFactory;
    import org.jboss.netty.channel.socket.nio.NioServerSocketChannelFactory;

    /*
     * @author Kimathi
     */

    public class Service {

        private Nodes nodes;

        public void start(){

            nodes = new Nodes();
            nodes.addSourceNodes(new SourceNodes()).
                  addSinkNodes(new SinkNodes()).
                  addConfigurations(new Configurations()).
                  boot();
        }

        public void stop(){

            nodes.stop();
        }

        public static void main(String [] args){

            new Service().start();
        }

    }

    class Nodes {

       private SourceNodes sourcenodes;

       private SinkNodes sinknodes ;

       private Configurations configurations;

       public Nodes addConfigurations(Configurations configurations){

           this.configurations = configurations;

           return this;
       }

       public Nodes addSourceNodes(SourceNodes sourcenodes){

           this.sourcenodes = sourcenodes;

           return this;
       }

       public Nodes addSinkNodes(SinkNodes sinknodes){

           this.sinknodes = sinknodes;

           return this;
       }

       public void boot(){

          Router router = new Router(configurations);

          sourcenodes.addPort(8000).
                      addPort(8001).
                      addPort(8002);
          sourcenodes.addRouter(router);
          sourcenodes.boot() ;

          sinknodes.addRemoteAddress("127.0.0.1", 6000).
                    addRemoteAddress("127.0.0.1", 6001).
                    addRemoteAddress("127.0.0.1", 6002);
          sinknodes.addRouter(router);
          sinknodes.boot();

       }

       public void stop(){

           sourcenodes.stop();

           sinknodes.stop();
       }

    } 

    final class SourceNodes implements Bootable , Routable {

        private List <Integer> ports = new ArrayList();

        private ServerBootstrap serverbootstrap;

        private Router router;

        @Override
        public void addRouter(final Router router){

            this.router = router;
        }

        public SourceNodes addPort(int port){

            this.ports.add(port);

            return this;
        }

        @Override
        public void boot(){

            this.initBootStrap();

            this.serverbootstrap.setOption("child.tcpNoDelay", true);
            this.serverbootstrap.setOption("child.keepAlive", true);
            this.serverbootstrap.setPipelineFactory(new ChannelPipelineFactory() {

                @Override
                public ChannelPipeline getPipeline() throws Exception {

                    return Channels.pipeline(new SourceHandler(router));
                }
            });



            for(int port:this.ports){
                this.serverbootstrap.bind(new InetSocketAddress(port));
            }
        }

        @Override
        public void stop(){

            this.serverbootstrap.releaseExternalResources();

        }

        private void initBootStrap(){

            ChannelFactory factory = new NioServerSocketChannelFactory( Executors.newCachedThreadPool(),Executors.newCachedThreadPool());

            this.serverbootstrap = new ServerBootstrap(factory);
        }
    }

    final class SinkNodes implements Bootable , Routable {

        private List<SinkAddress> addresses= new ArrayList();

        private ClientBootstrap clientbootstrap;

        private Router router;

        @Override
        public void addRouter(final Router router){

            this.router = router;

        }

        public SinkNodes addRemoteAddress(String hostAddress,int port){

            this.addresses.add(new SinkAddress(hostAddress,port));

            return this;
        }

        @Override
        public void boot(){

            this.initBootStrap();

            this.clientbootstrap.setOption("tcpNoDelay", true);
            this.clientbootstrap.setOption("keepAlive", true);
            this.clientbootstrap.setPipelineFactory(new ChannelPipelineFactory() {

                @Override
                public ChannelPipeline getPipeline() throws Exception {

                    return Channels.pipeline(new SinkHandler(router));
                }
            });

            for(SinkAddress address:this.addresses){

                this.clientbootstrap.connect(new InetSocketAddress(address.hostAddress(),address.port()));
            }
        }

        @Override
        public void stop(){

            this.clientbootstrap.releaseExternalResources();
        }

        private void initBootStrap(){

            ChannelFactory factory = new NioClientSocketChannelFactory( Executors.newCachedThreadPool(),Executors.newCachedThreadPool());

            this.clientbootstrap = new ClientBootstrap(factory);
        }  

        private class SinkAddress {

            private final String hostAddress;
            private final int port;

            public SinkAddress(String hostAddress, int port) {
                this.hostAddress   = hostAddress;
                this.port = port;
            }

            public String hostAddress()   { return this.hostAddress; }
            public int port() { return this.port; }
        }
    }

    class SourceHandler extends SimpleChannelHandler {

        private Router router;

        public SourceHandler(Router router){

            this.router = router;
        }

        @Override
        public void childChannelOpen(ChannelHandlerContext ctx, ChildChannelStateEvent e) throws Exception {

            System.out.println("child is opened");
        }

        @Override
        public void channelClosed(ChannelHandlerContext ctx, ChannelStateEvent e) throws Exception {

            System.out.println("child is closed");
        }

        @Override
        public void channelOpen(ChannelHandlerContext ctx, ChannelStateEvent e) throws Exception {


                System.out.println("Server is opened");

        }

        @Override
        public void exceptionCaught(ChannelHandlerContext ctx, ExceptionEvent e) throws Exception {

            System.out.println(e.getCause());
        }

        @Override
        public void messageReceived(ChannelHandlerContext ctx, MessageEvent e) throws Exception {


            System.out.println("channel received message");

        }
    }

    class SinkHandler extends SimpleChannelHandler {

        private Router router;

        public SinkHandler(Router router){

            this.router = router;
        }

        @Override
        public void channelConnected(ChannelHandlerContext ctx, ChannelStateEvent e) throws Exception {

            System.out.println("Channel is connected");
        }

        @Override
        public void exceptionCaught(ChannelHandlerContext ctx, ExceptionEvent e) throws Exception {

            System.out.println(e.getCause());
        }

        @Override
        public void messageReceived(ChannelHandlerContext ctx, MessageEvent e) throws Exception {

            System.out.println("channel received message");

        }
    }

    final class Router {

        private Configurations configurations;

        private Map sourcenodes = new HashMap();

        private Map Sinknodes = new HashMap();

        public Router(){}

        public Router(Configurations configurations){

            this.configurations = configurations;
        }

        public synchronized boolean submitSource(ChannelHandlerContext ctx , MessageEvent e){

            boolean responded = false;

            return responded;
        }

        public synchronized boolean submitSink(ChannelHandlerContext ctx , MessageEvent e){

            boolean responded = false;

            return responded;
        }
    }

    final class Configurations {

        public Configurations(){}
    }

    interface Bootable {

        public abstract void boot();

        public abstract void stop();
    }

    interface Routable {

        public abstract void addRouter(Router router);
    }
4

1 回答 1

0

这个想法似乎很合理。

源通道处理程序可以只使用 写入相应的接收器通道,Channel#write(...)反之亦然。

当然,您还需要一种将源通道与回复相关联的方法,而最好的方法取决于协议的性质。如果可能的话,最好的替代方法是以某种方式将消息中的源通道 id 编码到接收器通道(当然也在回复中)。

如果这是不可能的,您将不得不以某种方式保持相关性。如果保证回复与发送的请求配对,则每个接收器通道的 FIFO 队列可能是合适的。

于 2013-05-09T17:29:17.200 回答