0

如何在 Spring Cloud Stream 中实现反应式消息处理?我阅读了有关 Spring Cloud Function 的信息,并且我应该将它们用于响应式处理,因此我创建了示例一:

@Bean
public Consumer<Flux<Message<Loan>>> loanProcess() {
  return loanMessages ->
      loanMessages
          .flatMap(loanMessage -> Mono.fromCallable(() -> {
            if (loanMessage.getPayload().getStatus() == null) {
              log.error("Empty status");
              throw new RuntimeException("Loan status is empty");
            }
            return "Good";
          }))
          .doOnError(throwable -> log.error("Exception occurred: {}", throwable))
          .subscribe(status -> log.info("Message processed correctly: {}", status));
}

后来我开始思考上面的函数和带有@StreamListener的类和Reactor类型的使用有什么区别:

@StreamListener(Sink.INPUT)
public void loanReceived(Message<Loan> message) {
  Mono.just(message)
      .flatMap(loanMessage -> Mono.fromCallable(() -> {
        if (loanMessage.getPayload().getStatus() == null) {
          log.error("Empty status");
          throw new RuntimeException("Loan status is empty");
        }
        log.info("Correct message");
        return "Correct message received";
      }))
      .doOnError(throwable -> log.error("Exception occurred: {}", throwable.getClass()))
      .subscribe(status -> log.info("Message processed correctly: {}", status));
}

此外,在 Spring Webflux 中,我了解 netty 处理请求处理的线程很少(在事件循环中运行)。但是,我找不到 Spring Cloud Stream 中线程模型如何工作的文档。

4

0 回答 0