如何在 Spring Cloud Stream 中实现反应式消息处理?我阅读了有关 Spring Cloud Function 的信息,并且我应该将它们用于响应式处理,因此我创建了示例一:
@Bean
public Consumer<Flux<Message<Loan>>> loanProcess() {
return loanMessages ->
loanMessages
.flatMap(loanMessage -> Mono.fromCallable(() -> {
if (loanMessage.getPayload().getStatus() == null) {
log.error("Empty status");
throw new RuntimeException("Loan status is empty");
}
return "Good";
}))
.doOnError(throwable -> log.error("Exception occurred: {}", throwable))
.subscribe(status -> log.info("Message processed correctly: {}", status));
}
后来我开始思考上面的函数和带有@StreamListener的类和Reactor类型的使用有什么区别:
@StreamListener(Sink.INPUT)
public void loanReceived(Message<Loan> message) {
Mono.just(message)
.flatMap(loanMessage -> Mono.fromCallable(() -> {
if (loanMessage.getPayload().getStatus() == null) {
log.error("Empty status");
throw new RuntimeException("Loan status is empty");
}
log.info("Correct message");
return "Correct message received";
}))
.doOnError(throwable -> log.error("Exception occurred: {}", throwable.getClass()))
.subscribe(status -> log.info("Message processed correctly: {}", status));
}
此外,在 Spring Webflux 中,我了解 netty 处理请求处理的线程很少(在事件循环中运行)。但是,我找不到 Spring Cloud Stream 中线程模型如何工作的文档。