我正在使用 Spring Integration 5.1.5(与 RabbitMQ 一起使用spring-integration-amqp
)并且我在文档中读到 Spring Integration 支持项目反应器类型(我的意思是Mono
,Flux
等等)。但我不能让它为ServiceActivator工作。我正在尝试这样的事情:
@ServiceActivator
public Mono<Void> myMethod(List<Message> messages) {
Mono<Void> result = myService.doServiceStuff(messages);
return result;
}
(请注意,我也在尝试myMethod
使用,Flux<Message>
但这是一个单独的问题)。
myMethod
返回时我Mono<Void>
收到此错误:
Caused by: org.springframework.messaging.core.DestinationResolutionException: no output-channel or replyChannel header available
at org.springframework.integration.handler.AbstractMessageProducingHandler.sendOutput(AbstractMessageProducingHandler.java:426)
at org.springframework.integration.handler.AbstractMessageProducingHandler.doProduceOutput(AbstractMessageProducingHandler.java:284)
at org.springframework.integration.handler.AbstractMessageProducingHandler.produceOutput(AbstractMessageProducingHandler.java:265)
at org.springframework.integration.handler.AbstractMessageProducingHandler.sendOutputs(AbstractMessageProducingHandler.java:223)
at org.springframework.integration.handler.AbstractReplyProducingMessageHandler.handleMessageInternal(AbstractReplyProducingMessageHandler.java:129)
at org.springframework.integration.handler.AbstractMessageHandler.handleMessage(AbstractMessageHandler.java:162)
... 42 common frames omitted
将方法更改为:
@ServiceActivator
public void myMethod(List<Message> messages) {
Mono<Void> result = myService.doServiceStuff(messages);
result.subscribe(); // This is not what I want to do
}
并手动订阅反应流将使其工作,但这显然不是我想要做的。我宁愿期望spring-integration
框架来处理订阅。
这在 Spring Integration 中是否支持?如果是这样,我做错了什么?