我有一个将数据推送到 kafka 的端点。现在,我想分别用适当的状态码 2xx 或 5xx 来响应调用,以防 kafka 写入成功或失败。代码片段是
@Path("/prices")
public class PriceResource {
@Inject @Channel("price-create") Emitter<Double> priceEmitter;
@POST
@Consumes(MediaType.TEXT_PLAIN)
public void addPrice(Double price) {
priceEmitter.send(Message.of(price)
.withAck(() -> {
// Called when the message is acked
return CompletableFuture.completedFuture(null);
})
.withNack(throwable -> {
// Called when the message is nacked
return CompletableFuture.completedFuture(null);
}));
// return appropriate response
return Response
}
}
现在的问题是端点在执行 ack 或 nack 回调之前响应状态码。也尝试了的sendAndAwait
方法,MutinyEmitter
但该方法返回无效。所以没有办法知道消息是 acked 还是 nacked。