0

您好,我是 Webflux 的新手,我遵循构建反应式微服务的教程。在我的项目中,我遇到了以下问题。

我想为产品服务创建一个 crud api,下面是 Create 方法

@Override
public Product createProduct(Product product) {

    Optional<ProductEntity> productEntity = Optional.ofNullable(repository.findByProductId(product.getProductId()).block());
             productEntity.ifPresent((prod -> {
                  throw new InvalidInputException("Duplicate key, Product Id: " + product.getProductId());
              }));

    ProductEntity entity = mapper.apiToEntity(product);
    Mono<Product> newProduct = repository.save(entity)
        .log()
        .map(mapper::entityToApi);

    return newProduct.block();
}

问题是,当我从邮递员调用此方法时,我收到错误 “block()/blockFirst()/blockLast() 正在阻塞,线程 reactor-http-nio-3 不支持”但是当我使用 StreamListener这个电话工作正常。流侦听器从 rabbit-mq 通道获取事件

流监听器

@EnableBinding(Sink.class)
public class MessageProcessor {

    private final ProductService productService;

    public MessageProcessor(ProductService productService) {
        this.productService = productService;
    }

    @StreamListener(target = Sink.INPUT)
    public void process(Event<Integer, Product> event) {

       
        switch (event.getEventType()) {

            case CREATE:
                Product product = event.getData();
                LOG.info("Create product with ID: {}", product.getProductId());
                productService.createProduct(product);
                break;

   
            default:
                String errorMessage = "Incorrect event type: " + event.getEventType() + ", expected a CREATE or DELETE event";
                LOG.warn(errorMessage);
                throw new EventProcessingException(errorMessage);
        }
    }
}

我有两个问题。

  1. 为什么这适用于 StreamListener 而不是简单的请求?
  2. webflux 中是否有适当的方法来返回 Mono 的对象,或者我们总是必须返回 Mono?
4

1 回答 1

2

您的 create 方法希望看起来更像这样,并且您希望Mono<Product>从控制器返回 a 而不是单独的对象。

  public Mono<Product> createProduct(Product product) {
    return repository.findByProductId(product.getProductId())
        .switchIfEmpty(Mono.just(mapper.apiToEntity(product)))
        .flatMap(repository::save)
        .map(mapper::entityToApi);
  }

正如@Thomas 评论的那样,您正在破坏反应式编码的一些基础知识,并且没有通过使用 block() 获得好处,应该更多地阅读它。例如,您正在使用的响应式 mongo 存储库将返回一个 Mono,如果它是空的,它有自己的处理方法,而无需使用 Optional,如上所示。

如果实体已经存在,则编辑以映射到错误,否则保存

  public Mono<Product> createProduct(Product product) {
    return repository.findByProductId(product.getProductId())
        .hasElement()
        .filter(exists -> exists)
        .flatMap(exists -> Mono.error(new Exception("my exception")))
        .then(Mono.just(mapper.apiToEntity(product)))
        .flatMap(repository::save)
        .map(mapper::entityToApi);
  }
于 2020-08-06T13:52:21.407 回答