0

我们有一个 Caliban GraphQL 应用程序,将它与 Play 框架一起使用。它很好地涵盖了查询和突变的集成测试,现在我们要为订阅添加一些集成测试,并想知道如何正确地做到这一点。

对于我们使用通常的查询/突变测试FakeRequest,将其发送到扩展 Caliban's 的路由器,PlayRouter效果非常好。有没有类似的方法来测试 websockets/订阅?

互联网上关于 Play 中的 websocket 测试的信息量非常少,而且根本没有关于 GraphQL 订阅测试的信息。

将不胜感激任何想法!

4

1 回答 1

1

好的,我做到了。有几个规则要遵循:

  1. 使用 websocket 标头"WebSocket-Protocol" -> "graphql-ws"
  2. 建立连接后,发送GraphQLWSRequest类型"connection_init"
  3. 收到响应后"connection_ack",发送订阅查询作为有效负载GraphQLWSRequest的类型"start"

在这些步骤之后,服务器正在侦听,您可以发送您的变异查询。

一些草稿示例:

import caliban.client.GraphQLRequest
import caliban.client.ws.GraphQLWSRequest
import io.circe.syntax.EncoderOps
import play.api.libs.json.{JsValue, Json}
import play.api.test.Helpers.{POST, contentAsJson, contentAsString, contentType, route, status, _}
import org.awaitility.Awaitility

 def getWS(subscriptionQuery: String, postQuery: String): JsValue = {
    lazy val port    = Helpers.testServerPort
    val initRequest  = prepareWSRequest("connection_init")
    val startRequest = prepareWSRequest("start", Some(GraphQLRequest(subscriptionQuery, Map())))

    Helpers.running(TestServer(port, app)) {

      val headers = new java.util.HashMap[String, String]()
      headers.put("WebSocket-Protocol", "graphql-ws")

      val queue = new ArrayBlockingQueue[String](1)

      lazy val ws = new WebSocketClient(new URI(s"ws://localhost:$port/ws/graphql"), headers) {
        override def onOpen(handshakedata: ServerHandshake): Unit =
          logger.info("Websocket connection established")

        override def onClose(code: Port, reason: String, remote: Boolean): Unit =
          logger.info(s"Websocket connection closed, reason: $reason")

        override def onError(ex: Exception): Unit =
          logger.error("Error handling websocket connection", ex)

        override def onMessage(message: String): Unit = {
          val ttp = (Json.parse(message) \ "type").as[JsString].value
          if (ttp != "connection_ack" && ttp != "ka") queue.put(message)
        }
      }

      ws.connectBlocking()

      Future(ws.send(initRequest))
        .flatMap(_ => Future(ws.send(startRequest)))
        .flatMap(_ => post(query = postQuery)) // post is my local method, it sends usual FakeRequest
      
      Awaitility.await().until(() => queue.peek() != null)
      Json.parse(queue.take())
    }

  def prepareWSRequest(ttp: String, payload: Option[GraphQLRequest] = None) =
    GraphQLWSRequest(ttp, None, payload).asJson.noSpaces
  }
于 2021-06-23T13:23:33.023 回答