0

在之前的体验中,Vertx GraphQL 和 Netflix Dgs 支持 Apollo web-socket 订阅规范。

我尝试使用 Spring 测试基于 WebSocket 的订阅WebClientWebTestClient当通过 WebSocket 消息 paylaod 发送订阅请求时它不起作用。

@SpringBootTest(webEnvironment = SpringBootTest.WebEnvironment.RANDOM_PORT)
@Slf4j
@Disabled
class WebSocketSubscriptionTests {

    @LocalServerPort
    int port;

    WebTestClient webClient;

    @Autowired
    PostService postService;

    @Autowired
    ObjectMapper objectMapper;

    @BeforeEach
    void setUp() {
        this.webClient = WebTestClient.bindToServer().baseUrl("http://localhost:" + port).build();
    }

    @SneakyThrows
    @Test
    public void testAddComment() {
        // there are 4 posts initialized.
        String postId = this.postService.getAllPosts().get(0).getId();
        log.debug("post id: {}", postId);
        // add comment
        var addCommentQuery = """
                mutation addNewComment($postId:String!, $content:String!){
                    addComment(postId:$postId, content:$content){
                        id
                        postId
                        content
                    }
                }""".trim();
        var addCommentVariables = Map.of(
                "postId", postId,
                "content", "test comment"
        );
        Map<String, Object> addCommentBody = Map.of("query", addCommentQuery, "variables", addCommentVariables);
        webClient.post().uri("/graphql")
                .contentType(MediaType.APPLICATION_JSON)
                .accept(MediaType.APPLICATION_JSON)
                .bodyValue(addCommentBody)
                .exchange()
                .expectStatus().isOk()
                .expectBody()
                .jsonPath("data.addComment.id").exists()
                .jsonPath("data.addComment.content").isEqualTo("test comment");


        //handle subscription to /graphql websocket endpoints
        Map<String, Object> queryPayload = Map.of(
                "query", "subscription onCommentAdded { commentAdded { id postId content } }",
                "extensions", emptyMap(),
                "variables", emptyMap());
        var body = Map.of(
                "payload", queryPayload,
                "type", "start",
                "id", "1"
        );

        var commentsReplay = new ArrayList<String>();
        var socketClient = new ReactorNettyWebSocketClient();
        WebSocketHandler socketHandler = session -> {
            Mono<Void> receiveMono = session.receive().doOnNext(
                    it -> {
                        log.debug("next item: {}", it);
//                        String text = it.getPayloadAsText();
//                        log.debug("receiving message as text: {}", text);
//                        if ("data".equals(JsonPath.read(text, "type"))) {
//                            String comment = JsonPath.read(text, "payload.data.commentAdded.content");
//                            commentsReplay.add(comment);
//                        }
                    }
            ).log().then();

            String message = null;
            try {
                message = objectMapper.writeValueAsString(body);
            } catch (JsonProcessingException e) {
                e.printStackTrace();
            }
            return session
                    .send(Mono.delay(Duration.ofMillis(100)).thenMany(Flux.just(message).map(session::textMessage)))
                    .log()
                    .then(receiveMono);
        };

        MultiValueMapAdapter<String, String> queryParams = new MultiValueMapAdapter<>(Map.of("query", List.<String>of("subscription onCommentAdded { commentAdded { id postId content } }")));

        URI uri = new DefaultUriBuilderFactory("ws://localhost:" + port + "/graphql").builder().queryParams(queryParams).build();
        socketClient.execute(uri, socketHandler).block(Duration.ofMillis(500));

        assertThat(commentsReplay.size()).isEqualTo(1);
        assertThat(commentsReplay.get(0)).isEqualTo("test comment");
    }
}

最后尝试在WebSocket连接url中添加查询操作作为查询参数,触发了WebSocket,但是失败了,receiveMono方法中没有数据。

控制台输出中引发了以下异常。

Connecting to ws://localhost:50123/graphql?query=subscription%20onCommentAdded%20%7B%20commentAdded%20%7B%20id%20postId%20content%20%7D%20%7D
2021-10-09 12:10:47.660 DEBUG 13324 --- [o-auto-1-exec-2] o.s.web.servlet.DispatcherServlet        : GET "/graphql?query=subscription%20onCommentAdded%20%7B%20commentAdded%20%7B%20id%20postId%20content%20%7D%20%7D", parameters={masked}
2021-10-09 12:10:47.667 DEBUG 13324 --- [o-auto-1-exec-2] s.w.s.m.m.a.RequestMappingHandlerMapping : Mapped to io.leangen.graphql.spqr.spring.web.mvc.DefaultGraphQLController#executeGet(GraphQLRequest, Object)
2021-10-09 12:10:47.683 DEBUG 13324 --- [o-auto-1-exec-2] o.s.w.c.request.async.WebAsyncManager    : Started async request
2021-10-09 12:10:47.684 DEBUG 13324 --- [o-auto-1-exec-2] o.s.w.c.request.async.WebAsyncManager    : Async result set, dispatch to /graphql
2021-10-09 12:10:47.684 DEBUG 13324 --- [o-auto-1-exec-2] o.s.web.servlet.DispatcherServlet        : Exiting but response remains open for further handling
2021-10-09 12:10:47.685 DEBUG 13324 --- [o-auto-1-exec-2] o.s.web.servlet.DispatcherServlet        : "ASYNC" dispatch for GET "/graphql?query=subscription%20onCommentAdded%20%7B%20commentAdded%20%7B%20id%20postId%20content%20%7D%20%7D", parameters={masked}
2021-10-09 12:10:47.686 DEBUG 13324 --- [o-auto-1-exec-2] s.w.s.m.m.a.RequestMappingHandlerAdapter : Resume with async result [{data=graphql.execution.reactive.CompletionStageMappingPublisher@6997f06}]
2021-10-09 12:10:47.687 DEBUG 13324 --- [o-auto-1-exec-2] m.m.a.RequestResponseBodyMethodProcessor : Using 'application/json', given [*/*] and supported [application/json]
2021-10-09 12:10:47.687 DEBUG 13324 --- [o-auto-1-exec-2] m.m.a.RequestResponseBodyMethodProcessor : Writing [{data=graphql.execution.reactive.CompletionStageMappingPublisher@6997f06}]
2021-10-09 12:10:47.696 DEBUG 13324 --- [o-auto-1-exec-2] o.s.web.servlet.DispatcherServlet        : Exiting from "ASYNC" dispatch, status 200
2021-10-09 12:10:47.706  WARN 13324 --- [ctor-http-nio-3] r.netty.http.client.HttpClientConnect    : [id:60230589-1, L:/127.0.0.1:50125 - R:localhost/127.0.0.1:50123] The connection observed an error

io.netty.handler.codec.http.websocketx.WebSocketClientHandshakeException: Invalid handshake response getStatus: 200 
    at io.netty.handler.codec.http.websocketx.WebSocketClientHandshaker13.verify(WebSocketClientHandshaker13.java:272) ~[netty-codec-http-4.1.68.Final.jar:4.1.68.Final]
    at io.netty.handler.codec.http.websocketx.WebSocketClientHandshaker.finishHandshake(WebSocketClientHandshaker.java:304) ~[netty-codec-http-4.1.68.Final.jar:4.1.68.Final]
    at reactor.netty.http.client.WebsocketClientOperations.onInboundNext(WebsocketClientOperations.java:116) ~[reactor-netty-http-1.0.11.jar:1.0.11]

不知道为什么 WebSocket 处理工作在这里被委托给一个通用的 HTTP 控制器,但没有正确返回执行结果。

完整的代码托管在我的 Github 上

4

0 回答 0