6

I want to be able to read stream (from a socket) of json messages using Jackson (2).

There are ways to pass a Reader as the source, such as doing:

ObjectMapper mapper = new ObjectMapper();
MyObject obj = mapper.readValue(aReader, MyObject.class);

but that will block until the entire json message has arrived and I want to avoid that.

Is there a way to have a buffer to which I can keep adding bytes with the ability to ask if the buffer contains a full json representation of a specific class?
Something like:

JsonBuffer buffer = new JsonBuffer(MyObject.class);
...
buffer.add(readBytes);
if (buffer.hasObject()) {
    MyObject obj = buffer.readObject();
}

Thanks.

4

6 回答 6

6

Jackson从 2.9 开始支持非阻塞 JSON 流解析。您可以在 Spring Framework 5 Jackson2Tokenizer中找到有关如何使用它的示例。

于 2018-06-15T21:30:13.640 回答
4

您可以使用JsonParser来获取单个事件/令牌(这是ObjectMapper内部使用的),这允许更精细的访问。但是当前所有功能都使用阻塞 IO,因此没有办法进行所谓的非阻塞(又名“异步”)解析。

编辑:2019-09-18——更正:Jackson 2.9(https://github.com/FasterXML/jackson/wiki/Jackson-Release-2.9)增加了对非阻塞/异步 JSON 解析的支持(问题https:// github.com/FasterXML/jackson-core/issues/57

于 2012-06-03T04:06:34.030 回答
4

(我知道这个线程很旧,但由于没有可接受的答案,我想添加我的,以防万一有人还在读这个)。

我刚刚发布了一个名为 Actson 的新库 ( https://github.com/michel-kraemer/actson )。它的工作原理几乎与 OP 建议的一样。您可以向它提供字节,直到它返回一个或多个 JSON 事件。当它消耗完所有输入数据后,您向它提供更多字节并获取下一个 JSON 事件。这个过程一直持续到 JSON 文本被完全使用。

如果您了解 Aalto XML ( https://github.com/FasterXML/aalto-xml ),那么您应该能够快速熟悉 Actson,因为界面几乎相同。

这是一个简单的例子:

// JSON text to parse
byte[] json = "{\"name\":\"Elvis\"}".getBytes(StandardCharsets.UTF_8);

JsonParser parser = new JsonParser(StandardCharsets.UTF_8);

int pos = 0; // position in the input JSON text
int event; // event returned by the parser
do {
    // feed the parser until it returns a new event
    while ((event = parser.nextEvent()) == JsonEvent.NEED_MORE_INPUT) {
        // provide the parser with more input
        pos += parser.getFeeder().feed(json, pos, json.length - pos);

        // indicate end of input to the parser
        if (pos == json.length) {
            parser.getFeeder().done();
        }
    }

    // handle event
    System.out.println("JSON event: " + event);
    if (event == JsonEvent.ERROR) {
        throw new IllegalStateException("Syntax error in JSON text");
    }
} while (event != JsonEvent.EOF);
于 2016-08-20T08:28:43.960 回答
3

这不是我问题的答案,而是我想出的更多解决方法。

我没有在 Jackson 方面处理非阻塞 IO,而是在我的协议中实现了它。
发送时的所有 json 消息都用 4 个字节的 int 填充,它保存消息其余部分的长度。
现在读取 json 消息变得很容易,我只需找出长度是多少,异步读取它,然后可以将 Jackson 与结果字符串一起使用。

如果有人知道如何直接从杰克逊那里做到这一点,我仍然很想知道。

于 2012-06-03T06:48:29.127 回答
0

如何使用Gson

private fun handleActions(webSocketMessage: WebSocketMessage, webSocketSession: WebSocketSession): Mono<WebSocketMessage> {
    val action = Gson().fromJson(webSocketMessage.payloadAsText, ActionWs::class.java)
    return when (action.action) {
        "REGISTER" -> companyService.createCompany(action.company)
            .map { webSocketSession.textMessage(jacksonObjectMapper().writeValueAsString(it)) }
        else -> Mono.just(webSocketSession.textMessage(Gson().toJson(CompanyInfo(0, 0, 0, "ACAO INVALIDA!"))))
    }
}
于 2021-05-12T14:44:03.093 回答
0

我为这个问题找到了一些可行的解决方案。您可以使用方法inputStream.available检查流中是否有一些字节,并且此方法也是非阻塞的。因此,您可以使用此方法检查是否存在某些内容 - 解析值,如果没有 - 等待一段时间再检查。下面显示了两个示例。

安全风格 - 检查 START_OBJECT json 令牌:

while (run.get()) { 
    if (inputStream.available() > 0) {
        for (JsonToken jsonToken; (null != (jsonToken = jsonParser.nextToken())); ) {
            if (JsonToken.START_OBJECT.equals(jsonToken)) {
                outputStream.onNext(jsonParser.readValueAs(tClass));
                break;
            }
        }
    } else {
        Thread.sleep(200); // Or can be another checking time.
    }
}

或者最简单的风格:

while (run.get()) {
    if (inputStream.available() > 0) {
        outputStream.onNext(jsonParser.readValueAs(tClass));
    } else {
        Thread.sleep(200); // Or can be another checking time.
    }
}
于 2018-03-09T19:48:59.473 回答