您好,我是 Webflux 的新手,我遵循构建反应式微服务的教程。在我的项目中,我遇到了以下问题。
我想为产品服务创建一个 crud api,下面是 Create 方法
@Override
public Product createProduct(Product product) {
Optional<ProductEntity> productEntity = Optional.ofNullable(repository.findByProductId(product.getProductId()).block());
productEntity.ifPresent((prod -> {
throw new InvalidInputException("Duplicate key, Product Id: " + product.getProductId());
}));
ProductEntity entity = mapper.apiToEntity(product);
Mono<Product> newProduct = repository.save(entity)
.log()
.map(mapper::entityToApi);
return newProduct.block();
}
问题是,当我从邮递员调用此方法时,我收到错误 “block()/blockFirst()/blockLast() 正在阻塞,线程 reactor-http-nio-3 不支持”但是当我使用 StreamListener这个电话工作正常。流侦听器从 rabbit-mq 通道获取事件
流监听器
@EnableBinding(Sink.class)
public class MessageProcessor {
private final ProductService productService;
public MessageProcessor(ProductService productService) {
this.productService = productService;
}
@StreamListener(target = Sink.INPUT)
public void process(Event<Integer, Product> event) {
switch (event.getEventType()) {
case CREATE:
Product product = event.getData();
LOG.info("Create product with ID: {}", product.getProductId());
productService.createProduct(product);
break;
default:
String errorMessage = "Incorrect event type: " + event.getEventType() + ", expected a CREATE or DELETE event";
LOG.warn(errorMessage);
throw new EventProcessingException(errorMessage);
}
}
}
我有两个问题。
- 为什么这适用于 StreamListener 而不是简单的请求?
- webflux 中是否有适当的方法来返回 Mono 的对象,或者我们总是必须返回 Mono?