2

我是 vert.x 的新手。我正在尝试 vert.x “NetServer” 功能。http://vertx.io/core_manual_java.html#writing-tcp-servers-and-clients它就像一个魅力。

但是,我还读到“Verticle 实例是严格单线程的。

如果您创建一个简单的 TCP 服务器并部署它的单个实例,那么该服务器的所有处理程序总是在同一个事件循环(线程)上执行。”

目前,对于我的实现,我想接收 TCP 字节流,然后触发另一个组件。但这不应该是 Verticle 的“开始”方法中的阻塞调用。那么,在 start 方法中编写执行程序是一种好习惯吗?或者 vertx 会自动处理这种情况。

这是一个片段

public class TCPListener extends Verticle {

    public void start(){

        NetServer server = vertx.createNetServer();

        server.connectHandler(new Handler<NetSocket>() {
            public void handle(NetSocket sock) {
                container.logger().info("A client has connected");
                sock.dataHandler(new Handler<Buffer>() {
                    public void handle(Buffer buffer) {
                        container.logger().info("I received " + buffer.length() + " bytes of data");

                        container.logger().info("I received " + new String(buffer.getBytes()));
                        //Trigger another component here. SHould be done in a sperate thread. 
                        //The previous call should be returned . No need to wait for component response.
                    }
                });
            }
        }).listen(1234, "host");
    }
}

应该是什么机制使它成为非阻塞调用。

4

3 回答 3

9

我认为这不是 vert.x 的方式。

更好的方法是正确使用事件总线而不是 Executor。让工作人员响应总线上的事件,进行处理,并在完成时向总线发出信号。

创建线程违背了使用 vert.x 的目的。

于 2013-11-06T14:26:14.013 回答
1

最灵活的方法是创建一个 ExecutorService 并用它处理请求。这带来了对工作线程模型的细粒度控制(固定或可变数量的线程,应在单个线程上串行执行的工作等)。

修改后的示例可能如下所示:

public class TCPListener extends Verticle {

    private final ExecutorService executor = Executors.newFixedThreadPool(10);

    public void start(){

        NetServer server = vertx.createNetServer();

        server.connectHandler(new Handler<NetSocket>() {
            public void handle(final NetSocket sock) { // <-- Note 'final' here
                container.logger().info("A client has connected");
                sock.dataHandler(new Handler<Buffer>() {
                    public void handle(final Buffer buffer) { // <-- Note 'final' here

                        //Trigger another component here. SHould be done in a sperate thread. 
                        //The previous call should be returned . No need to wait for component response.
                        executor.submit(new Runnable() {

                            public void run() {
                                //It's okay to read buffer data here
                                //and use sock.write() if necessary
                                container.logger().info("I received " + buffer.length() + " bytes of data");
                                container.logger().info("I received " + new String(buffer.getBytes()));
                            }
                        }
                    }
                });
            }
        }).listen(1234, "host");
    }
}
于 2013-08-21T09:54:05.520 回答
0

正如 duffymo 提到的,创建线程违背了使用 vertx 的目的。最好的方法是将消息写入事件总线并创建一个新的处理程序来监听来自事件总线的消息。更新了代码以展示这一点。将消息写入“next.topic”主题,并注册一个处理程序以从“next.topic”主题读取消息。

public class TCPListener extends Verticle {

public void start(){

    NetServer server = vertx.createNetServer();

    server.connectHandler(new Handler<NetSocket>() {
        public void handle(NetSocket sock) {
            container.logger().info("A client has connected");
            sock.dataHandler(new Handler<Buffer>() {
                public void handle(Buffer buffer) {
                    String recvMesg = new String(buffer.getBytes());
                    container.logger().info("I received " + buffer.length() + " bytes of data");

                    container.logger().info("I received " + recvMesg);
                    //Writing received message to event bus
                    vertx.eventBus().send("next.topic", recvMesg);
                }
            });
        }
    }).listen(1234, "host");

    //Registering new handler listening to "next.topic" topic on event bus
    vertx.eventBus().registerHandler("next.topic", new Handler<Message<String>() {
       public void handle(Message<String> mesg) {
           container.logger.info("Received message: "+mesg.body());
       }
    };

}
}
于 2015-07-03T00:18:13.827 回答