如何使用 quarkus + kafka + smallrye 处理流处理异常?
我的代码与 quarkus 指南 ( https://quarkus.io/guides/kafka#imperative-usage )上的命令式生产者示例非常相似
import io.smallrye.reactive.messaging.annotations.Channel;
import io.smallrye.reactive.messaging.annotations.Emitter;
import javax.inject.Inject;
import javax.ws.rs.POST;
import javax.ws.rs.Path;
import javax.ws.rs.Consumes;
import javax.ws.rs.core.MediaType;
@Path("/prices")
public class PriceResource {
@Inject @Channel("price-create") Emitter<Double> priceEmitter;
@POST
@Consumes(MediaType.TEXT_PLAIN)
public void addPrice(Double price) {
priceEmitter.send(price);
}
}
我想要类似于 vanilla Kafka 库的东西,它提供了处理请求发送的每条记录的回调的选项。
ProducerRecord<String, String> record = new ProducerRecord<>("topic-name", key, value);
producer.send(record, new Callback() {
@Override
public void onCompletion(RecordMetadata metadata, Exception exception) {
logger.info(record.toString());
if (exception != null) {
logger.error("Producer exception", exception);
}
}
});
Tks