0

我是 Quarkus 的新手。我正在尝试使用接收输入的 quarkus reactive 编写 REST 端点,进行一些验证,将输入转换为列表,然后将消息写入 kafka。我的理解是将所有内容都转换为 Uni/Multi,这将导致在 I/O 线程上以异步方式执行。在 intelliJ 日志中,我可以看到代码在执行程序线程中以顺序方式执行。kafka 写入顺序发生在其自己的网络线程中,这增加了延迟。

@POST
    @Consumes(MediaType.APPLICATION_JSON)
    @Produces(MediaType.APPLICATION_JSON)
    public Multi<OutputSample> send(InputSample inputSample) {
        ObjectMapper mapper = new ObjectMapper();

       //deflateMessage() converts input to a list of inputSample
        Multi<InputSample> keys = Multi.createFrom().item(inputSample)
                .onItem().transformToMulti(array -> Multi.createFrom().iterable(deflateMessage.deflateMessage(array)))
                .concatenate();

     
        return keys.onItem().transformToUniAndMerge(payload -> {
            try {
                return producer.writeToKafka(payload, mapper);
            } catch (JsonProcessingException e) {
                e.printStackTrace();
            }
            return null;
        });
    } 
@Inject
@Channel("write")
    Emitter<String> emitter;

Uni<OutputSample> writeToKafka(InputSample kafkaPayload, ObjectMapper mapper) throws JsonProcessingException {
        
        String inputSampleJson = mapper.writeValueAsString(kafkaPayload);
        return Uni.createFrom().completionStage(emitter.send(inputSampleJson))
                .onItem().transform(ignored -> new OutputSample("id", 200, "OK"))
                .onFailure().recoverWithItem(new OutputSample("id", 500, "INTERNAL_SERVER_ERROR"));
    }

我已经做了几天了。不确定是否做错了什么。任何帮助,将不胜感激。谢谢

4

2 回答 2

2

与任何其他反应式库一样, mutiny主要围绕数据流控制设计。

话虽如此,它的核心将提供一组功能(通常通过一些操作员)来控制流执行和调度。这意味着除非您指示munity对象异步执行,否则它们将简单地以顺序(旧)方式执行。

使用两个运算符控制执行调度:

  • runSubscriptionOn:这将导致生成项目的代码片段(通常称为上游)在指定的线程上执行Executor
  • emitOn:这将导致订阅代码(通常称为下游)在指定的线程上执行Executor

然后,您可以按如下方式更新代码,从而导致通货紧缩异步进行:

Multi<InputSample> keys = Multi.createFrom()
    .item(inputSample)
    .onItem()
    .transformToMulti(array -> Multi.createFrom()
            .iterable(deflateMessage.deflateMessage(array)))
    .runSubscriptionOn(Infrastructure.getDefaultExecutor()) // items will be transformed on a separate thread
    .concatenate();

编辑:下游在单独的线程上

为了在单独的线程上完成完整的下游、转换和写入KafkaemitOn队列,您可以使用如下操作符:

@POST
@Consumes(MediaType.APPLICATION_JSON)
@Produces(MediaType.APPLICATION_JSON)
public Multi<OutputSample> send(InputSample inputSample) {
    ObjectMapper mapper = new ObjectMapper();
    return Uni.createFrom()
            .item(inputSample)
            .onItem()
            .transformToMulti(array -> Multi.createFrom().iterable(deflateMessage.deflateMessage(array)))
            .emitOn(Executors.newFixedThreadPool(5)) // items will be emitted on a separate thread after transformation
            .onItem()
            .transformToUniAndConcatenate(payload -> {
                try {
                    return producer.writeToKafka(payload, mapper);
                } catch (JsonProcessingException e) {
                    e.printStackTrace();
                }
                return Uni.createFrom().<OutputSample>nothing();
            });
}
于 2021-10-02T12:56:46.337 回答
1

Multi 旨在用于当您有一个连续发出项目直到它发出完成事件的源时,这不是您的情况。

来自 Mutiny文档

Multi 表示数据流。流可以发出 0、1、n 或无限数量的项目。

您很少会自己创建 Multi 实例,而是使用公开 Mutiny API 的反应式客户端。

您正在寻找的是Uni<List<OutputSample>>因为您的 API 返回 1 且只有 1 项带有完整的结果列表。

因此,您需要将每条消息发送到 Kafka,而不是立即等待它们返回,而是收集生成的 Unis,然后将其收集到单个 Uni。

@POST
public Uni<List<OutputSample>> send(InputSample inputSample) {
    // This could be injected directly inside your producer
    ObjectMapper mapper = new ObjectMapper();

    // Send each item to Kafka and collect resulting Unis
    List<Uni<OutputSample>> uniList = deflateMessage(inputSample).stream()
            .map(input -> producer.writeToKafka(input, mapper))
            .collect(Collectors.toList());

    // Transform a list of Unis to a single Uni of a list
    @SuppressWarnings("unchecked") // Mutiny API fault...
    Uni<List<OutputSample>> result = Uni.combine().all().unis(uniList)
            .combinedWith(list -> (List<OutputSample>) list);

    return result;
}
于 2021-10-07T12:59:58.473 回答