2

我有一个将数据推送到 kafka 的端点。现在,我想分别用适当的状态码 2xx 或 5xx 来响应调用,以防 kafka 写入成功或失败。代码片段是

@Path("/prices")
public class PriceResource {

    @Inject @Channel("price-create") Emitter<Double> priceEmitter;

    @POST
    @Consumes(MediaType.TEXT_PLAIN)
    public void addPrice(Double price) {
        priceEmitter.send(Message.of(price)
            .withAck(() -> {
                // Called when the message is acked
                return CompletableFuture.completedFuture(null);
            })
            .withNack(throwable -> {
                // Called when the message is nacked
                return CompletableFuture.completedFuture(null);
            }));
         // return appropriate response
          return Response
    }
}

现在的问题是端点在执行 ack 或 nack 回调之前响应状态码。也尝试了的sendAndAwait方法,MutinyEmitter但该方法返回无效。所以没有办法知道消息是 acked 还是 nacked。

4

1 回答 1

2

这里最好的方法是链接异步操作,如下所示:

@POST
@Consumes(MediaType.TEXT_PLAIN)
public Uni<Response> addPrice(Double price) {
    return Uni.createFrom().completionStage(priceEmitter.send(price))
            .onItem().transform(ignored -> Response.ok().entity("foo").build())
            .onFailure().recoverWithItem(Response.serverError().build());
}

如果您想使用同步代码(我不推荐):

@Blocking
@POST
@Consumes(MediaType.TEXT_PLAIN)
public Response addPrice(Double price) {
    try {
        Uni.createFrom().completionStage(priceEmitter.send(price))
                .await().indefinitely();

        return Response.ok().entity("foo").build();
    } catch (Exception e) {
        return Response.serverError().build();
    }
}

.await().indefinitely() 将在 Uni 发出失败的情况下抛出异常。

您还可以选择直接使用发射器返回的 CompletionStage 而不将其转换为 Uni,但请记住 Quarkus 选择 Mutiny 作为其默认反应框架。

于 2021-09-29T14:19:19.167 回答