1

我编写了两个 junit 方法来使用 Atmosphere 和 webSockets 测试我的 Jersey 资源。

问题是当我调用 Suspend 和 Broacast 时,只调用了我的 WebSocketTextListener 的 onOpen 方法。OnError、OnMessage、OnClose 都没有被调用:(

知道为什么不调用 OnMessage 方法吗?

大气球衣资源:

@Path("/websocket")
    @Suspend
    @GET
    @Produces({MediaType.APPLICATION_JSON})
    public String suspend() {
        return "";
    }   

    @Path("/websocket")
    @Broadcast(writeEntity = false)
    @POST
    @Produces({MediaType.APPLICATION_JSON})
    public String broadcast(String message) {
        return "BROADCASTTT";
    }  

测试暂停 WEBSOCKET 呼叫:

 @Test
    public void testAddMealSubscriber() throws Exception {

        final CountDownLatch latch = new CountDownLatch(1);
        String restaurantId = "SalernoNapoliBarcelona";
        String mealId = "14b74bddc68d6f1b4c22e7f7b200067f";

        String url = "ws://localhost:8080/rest/" + "restaurants/" + restaurantId + "/meals/" + mealId + "/websocket/";

        AsyncHttpClient client = new AsyncHttpClient();

        try {
            final AtomicReference response = new AtomicReference(null);
            WebSocket websocket = client.prepareGet(url)
                    .execute(new WebSocketUpgradeHandler.Builder().addWebSocketListener(
                            new WebSocketTextListener() {

                                @Override
                                public void onMessage(String message) {
                                    System.out.println("WebSocketTextListener onMessage:" + message);
                                    response.set(message);
                                    latch.countDown();
                                }

                                @Override
                                public void onFragment(String fragment, boolean last) {

                                    System.out.println("WebSocketTextListener onFragment:" + fragment);
                                }

                                @Override
                                public void onOpen(WebSocket websocket) {
                                    System.out.println("WebSocketTextListener onOpen");
                                }

                                @Override
                                public void onClose(WebSocket websocket) {
                                    System.out.println("WebSocketTextListener onClose");
                                    latch.countDown();
                                }

                                @Override
                                public void onError(Throwable t) {
                                    System.out.println("WebSocketTextListener onError");
                                    t.printStackTrace();
                                }
                            }).build()).get();

            try {
                latch.await(60, TimeUnit.SECONDS);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            assertNotNull(response.get());
            assertEquals(response.get(), "echo");
        } catch (Exception e) {
            e.printStackTrace();
        }

        client.close();

    }

测试广播网络电话:

 @Test
    public void testAddMealPublisher() throws Exception {

        final CountDownLatch latch = new CountDownLatch(1);
        String restaurantId = "SalernoNapoliBarcelona";
        String mealId = "14b74bddc68d6f1b4c22e7f7b200067f";

        String url = "ws://localhost:8080/rest/" + "restaurants/" + restaurantId + "/meals/" + mealId + "/websocket/";

        AsyncHttpClient c = new AsyncHttpClient();
        try {
            final AtomicReference response = new AtomicReference(null);

            WebSocket websocket = c.prepareGet(url)
                    .execute(new WebSocketUpgradeHandler.Builder().addWebSocketListener(
                            new WebSocketTextListener() {

                                @Override
                                public void onMessage(String message) {
                                    response.set(message);
                                    latch.countDown();
                                }

                                @Override
                                public void onFragment(String fragment, boolean last) {

                                    System.out.println("WebSocketTextListener onFragment:" + fragment);
                                }

                                @Override
                                public void onOpen(WebSocket websocket) {
                                    System.out.println("WebSocketTextListener onOpen");
                                }

                                @Override
                                public void onClose(WebSocket websocket) {
                                    System.out.println("WebSocketTextListener onClose");
                                    latch.countDown();
                                }

                                @Override
                                public void onError(Throwable t) {
                                    System.out.println("WebSocketTextListener onError");
                                    t.printStackTrace();
                                }
                            }).build()).get().sendTextMessage("MESSSAGGGEEEE");


            try {
                latch.await(5, TimeUnit.SECONDS);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }

            assertNotNull(response.get());
            assertEquals(response.get(), "echo");

        } catch (Exception e) {
            e.printStackTrace();
        }

        c.close();

    }

执行第一个 SUSPEND CALL 然后 BRODCAST 调用时的 Jersey 日志:

2012 年 7 月 20 日下午 1:54:10 com.sun.jersey.api.container.filter.LoggingFilter 过滤器
INFO: 1 * 服务器入站请求
1 > 获取 http://localhost:8080/rest/restaurants/SalernoNapoliBarcelona/meals/14b74bddc68d6f1b4c22e7f7b200067f/websocket/
1 > Sec-WebSocket-版本:13
1 > 升级:WebSocket
1 > Sec-WebSocket-Key: Wf7vyIGCD3Sa8StcdsGIkg==
1 > 主机:本地主机:8080
1 > 接受:*/*
1 > 用户代理:NING/1.0
1 > 连接:升级
1> 来源:http://localhost:8080
1 > X-Atmosphere-Transport:websocket
1 >

2012 年 7 月 20 日下午 1:54:31 com.sun.jersey.api.container.filter.LoggingFilter 过滤器
INFO: 2 * 服务器入站请求
2 > 获取 http://localhost:8080/rest/restaurants/SalernoNapoliBarcelona/meals/14b74bddc68d6f1b4c22e7f7b200067f/websocket/
2 > Sec-WebSocket-版本:13
2 > 升级:WebSocket
2 > Sec-WebSocket-Key: RH/DbdkwQK1xBwhyhXLkAQ==
2 > 主机:本地主机:8080
2 > 接受:*/*
2 > 用户代理:NING/1.0
2 > 连接:升级
2> 来源:http://localhost:8080
2 > X-Atmosphere-Transport: websocket
2 >

2012 年 7 月 20 日下午 1:54:34 com.sun.jersey.api.container.filter.LoggingFilter 过滤器
INFO: 3 * 服务器入站请求
3 > 发布 http://localhost:8080/rest/restaurants/SalernoNapoliBarcelona/meals/14b74bddc68d6f1b4c22e7f7b200067f/websocket/
3 > X-Atmosphere-Transport: websocket
3 > X-Atmosphere-Transport: websocket
3 > 内容类型:application/json
3 >

2012 年 7 月 20 日下午 1:54:34 com.sun.jersey.api.container.filter.LoggingFilter$Adapter 完成
INFO: 3 * 服务器出站响应
3
4

1 回答 1

1

问题是由于两个不同的 AsyncHttpClient 用于 SUBSCRIBE 和 BROADCAST 调用。

要解决此问题,请在 SetUp 方法中创建一个 AsyncHttpClient 并在两种测试方法中使用它:

 private AsyncHttpClient client;

    @Before
    public void setUp() throws Exception {

         client = new AsyncHttpClient();

    }
于 2012-07-22T11:38:05.740 回答