在之前的体验中,Vertx GraphQL 和 Netflix Dgs 支持 Apollo web-socket 订阅规范。
我尝试使用 Spring 测试基于 WebSocket 的订阅WebClient
,WebTestClient
当通过 WebSocket 消息 paylaod 发送订阅请求时它不起作用。
@SpringBootTest(webEnvironment = SpringBootTest.WebEnvironment.RANDOM_PORT)
@Slf4j
@Disabled
class WebSocketSubscriptionTests {
@LocalServerPort
int port;
WebTestClient webClient;
@Autowired
PostService postService;
@Autowired
ObjectMapper objectMapper;
@BeforeEach
void setUp() {
this.webClient = WebTestClient.bindToServer().baseUrl("http://localhost:" + port).build();
}
@SneakyThrows
@Test
public void testAddComment() {
// there are 4 posts initialized.
String postId = this.postService.getAllPosts().get(0).getId();
log.debug("post id: {}", postId);
// add comment
var addCommentQuery = """
mutation addNewComment($postId:String!, $content:String!){
addComment(postId:$postId, content:$content){
id
postId
content
}
}""".trim();
var addCommentVariables = Map.of(
"postId", postId,
"content", "test comment"
);
Map<String, Object> addCommentBody = Map.of("query", addCommentQuery, "variables", addCommentVariables);
webClient.post().uri("/graphql")
.contentType(MediaType.APPLICATION_JSON)
.accept(MediaType.APPLICATION_JSON)
.bodyValue(addCommentBody)
.exchange()
.expectStatus().isOk()
.expectBody()
.jsonPath("data.addComment.id").exists()
.jsonPath("data.addComment.content").isEqualTo("test comment");
//handle subscription to /graphql websocket endpoints
Map<String, Object> queryPayload = Map.of(
"query", "subscription onCommentAdded { commentAdded { id postId content } }",
"extensions", emptyMap(),
"variables", emptyMap());
var body = Map.of(
"payload", queryPayload,
"type", "start",
"id", "1"
);
var commentsReplay = new ArrayList<String>();
var socketClient = new ReactorNettyWebSocketClient();
WebSocketHandler socketHandler = session -> {
Mono<Void> receiveMono = session.receive().doOnNext(
it -> {
log.debug("next item: {}", it);
// String text = it.getPayloadAsText();
// log.debug("receiving message as text: {}", text);
// if ("data".equals(JsonPath.read(text, "type"))) {
// String comment = JsonPath.read(text, "payload.data.commentAdded.content");
// commentsReplay.add(comment);
// }
}
).log().then();
String message = null;
try {
message = objectMapper.writeValueAsString(body);
} catch (JsonProcessingException e) {
e.printStackTrace();
}
return session
.send(Mono.delay(Duration.ofMillis(100)).thenMany(Flux.just(message).map(session::textMessage)))
.log()
.then(receiveMono);
};
MultiValueMapAdapter<String, String> queryParams = new MultiValueMapAdapter<>(Map.of("query", List.<String>of("subscription onCommentAdded { commentAdded { id postId content } }")));
URI uri = new DefaultUriBuilderFactory("ws://localhost:" + port + "/graphql").builder().queryParams(queryParams).build();
socketClient.execute(uri, socketHandler).block(Duration.ofMillis(500));
assertThat(commentsReplay.size()).isEqualTo(1);
assertThat(commentsReplay.get(0)).isEqualTo("test comment");
}
}
最后尝试在WebSocket连接url中添加查询操作作为查询参数,触发了WebSocket,但是失败了,receiveMono
方法中没有数据。
控制台输出中引发了以下异常。
Connecting to ws://localhost:50123/graphql?query=subscription%20onCommentAdded%20%7B%20commentAdded%20%7B%20id%20postId%20content%20%7D%20%7D
2021-10-09 12:10:47.660 DEBUG 13324 --- [o-auto-1-exec-2] o.s.web.servlet.DispatcherServlet : GET "/graphql?query=subscription%20onCommentAdded%20%7B%20commentAdded%20%7B%20id%20postId%20content%20%7D%20%7D", parameters={masked}
2021-10-09 12:10:47.667 DEBUG 13324 --- [o-auto-1-exec-2] s.w.s.m.m.a.RequestMappingHandlerMapping : Mapped to io.leangen.graphql.spqr.spring.web.mvc.DefaultGraphQLController#executeGet(GraphQLRequest, Object)
2021-10-09 12:10:47.683 DEBUG 13324 --- [o-auto-1-exec-2] o.s.w.c.request.async.WebAsyncManager : Started async request
2021-10-09 12:10:47.684 DEBUG 13324 --- [o-auto-1-exec-2] o.s.w.c.request.async.WebAsyncManager : Async result set, dispatch to /graphql
2021-10-09 12:10:47.684 DEBUG 13324 --- [o-auto-1-exec-2] o.s.web.servlet.DispatcherServlet : Exiting but response remains open for further handling
2021-10-09 12:10:47.685 DEBUG 13324 --- [o-auto-1-exec-2] o.s.web.servlet.DispatcherServlet : "ASYNC" dispatch for GET "/graphql?query=subscription%20onCommentAdded%20%7B%20commentAdded%20%7B%20id%20postId%20content%20%7D%20%7D", parameters={masked}
2021-10-09 12:10:47.686 DEBUG 13324 --- [o-auto-1-exec-2] s.w.s.m.m.a.RequestMappingHandlerAdapter : Resume with async result [{data=graphql.execution.reactive.CompletionStageMappingPublisher@6997f06}]
2021-10-09 12:10:47.687 DEBUG 13324 --- [o-auto-1-exec-2] m.m.a.RequestResponseBodyMethodProcessor : Using 'application/json', given [*/*] and supported [application/json]
2021-10-09 12:10:47.687 DEBUG 13324 --- [o-auto-1-exec-2] m.m.a.RequestResponseBodyMethodProcessor : Writing [{data=graphql.execution.reactive.CompletionStageMappingPublisher@6997f06}]
2021-10-09 12:10:47.696 DEBUG 13324 --- [o-auto-1-exec-2] o.s.web.servlet.DispatcherServlet : Exiting from "ASYNC" dispatch, status 200
2021-10-09 12:10:47.706 WARN 13324 --- [ctor-http-nio-3] r.netty.http.client.HttpClientConnect : [id:60230589-1, L:/127.0.0.1:50125 - R:localhost/127.0.0.1:50123] The connection observed an error
io.netty.handler.codec.http.websocketx.WebSocketClientHandshakeException: Invalid handshake response getStatus: 200
at io.netty.handler.codec.http.websocketx.WebSocketClientHandshaker13.verify(WebSocketClientHandshaker13.java:272) ~[netty-codec-http-4.1.68.Final.jar:4.1.68.Final]
at io.netty.handler.codec.http.websocketx.WebSocketClientHandshaker.finishHandshake(WebSocketClientHandshaker.java:304) ~[netty-codec-http-4.1.68.Final.jar:4.1.68.Final]
at reactor.netty.http.client.WebsocketClientOperations.onInboundNext(WebsocketClientOperations.java:116) ~[reactor-netty-http-1.0.11.jar:1.0.11]
不知道为什么 WebSocket 处理工作在这里被委托给一个通用的 HTTP 控制器,但没有正确返回执行结果。