我是 Quarkus 的新手。我正在尝试使用接收输入的 quarkus reactive 编写 REST 端点,进行一些验证,将输入转换为列表,然后将消息写入 kafka。我的理解是将所有内容都转换为 Uni/Multi,这将导致在 I/O 线程上以异步方式执行。在 intelliJ 日志中,我可以看到代码在执行程序线程中以顺序方式执行。kafka 写入顺序发生在其自己的网络线程中,这增加了延迟。
@POST
@Consumes(MediaType.APPLICATION_JSON)
@Produces(MediaType.APPLICATION_JSON)
public Multi<OutputSample> send(InputSample inputSample) {
ObjectMapper mapper = new ObjectMapper();
//deflateMessage() converts input to a list of inputSample
Multi<InputSample> keys = Multi.createFrom().item(inputSample)
.onItem().transformToMulti(array -> Multi.createFrom().iterable(deflateMessage.deflateMessage(array)))
.concatenate();
return keys.onItem().transformToUniAndMerge(payload -> {
try {
return producer.writeToKafka(payload, mapper);
} catch (JsonProcessingException e) {
e.printStackTrace();
}
return null;
});
}
@Inject
@Channel("write")
Emitter<String> emitter;
Uni<OutputSample> writeToKafka(InputSample kafkaPayload, ObjectMapper mapper) throws JsonProcessingException {
String inputSampleJson = mapper.writeValueAsString(kafkaPayload);
return Uni.createFrom().completionStage(emitter.send(inputSampleJson))
.onItem().transform(ignored -> new OutputSample("id", 200, "OK"))
.onFailure().recoverWithItem(new OutputSample("id", 500, "INTERNAL_SERVER_ERROR"));
}
我已经做了几天了。不确定是否做错了什么。任何帮助,将不胜感激。谢谢