1

我正在使用 API 服务器通过使用无限分块响应来实现“服务器推送”功能。响应中的每个块代表一个推送到客户端的消息服务器。每个块实际上是一个完整的 json 对象。这是我用作接收服务器推送到的消息的客户端的代码。

Flux<JSONObject> jsonObjectFlux = client
        .post(uriBuilder.expand("/data/long_poll").toString(), request -> {
          String pollingRequest = createPollingRequest();
          return request
              .failOnClientError(false)
              .failOnServerError(false)
              .addHeader("Authorization", host.getToken())
              .addHeader(HttpHeaders.CONTENT_TYPE, MediaType.APPLICATION_JSON_VALUE)
              .addHeader(HttpHeaders.CONTENT_LENGTH,
                  String.valueOf(ByteBufUtil.utf8Bytes(pollingRequest)))
              .sendString(Mono.just(pollingRequest));
        }).flatMapMany(response -> response.receiveContent().map(httpContent -> {
          ByteBuf byteBuf = httpContent.content();
          String source = new String(ByteBufUtil.getBytes(byteBuf), Charsets.UTF_8);
          return new JSONObject(source);
        }));
    jsonObjectFlux.subscribe(jsonObject -> {
      logger.debug("JSON: {}", jsonObject);
    });

但是我遇到了异常,例如:

reactor.core.Exceptions$ErrorCallbackNotImplemented: org.json.JSONException: Unterminated string at 846 [character 847 line 1]
Caused by: org.json.JSONException: Unterminated string at 846 [character 847 line 1]
    at org.json.JSONTokener.syntaxError(JSONTokener.java:433)
    at org.json.JSONTokener.nextString(JSONTokener.java:260)
    at org.json.JSONTokener.nextValue(JSONTokener.java:360)
    at org.json.JSONObject.<init>(JSONObject.java:214)
    at org.json.JSONTokener.nextValue(JSONTokener.java:363)
    at org.json.JSONObject.<init>(JSONObject.java:214)

显然,我没有得到完整的 json 数据。我想知道 usingresponse.receiveContent()是否是处理一大块数据的正确方法。

4

0 回答 0