0

我有两个使用 RabbitMQ 进行通信的 Spring Boot 服务。Service1 向 Service2 发送会话创建请求。Service2 处理请求并应返回响应。Service1 应该处理响应。

Service1 请求会话的方法:

public void startSession()
{
     ListenableFuture<SessionCreationResponseDTO> sessionCreationResponse = sessionGateway.requestNewSession();

     sessionCreationResponse.addCallback(response -> {
             //handle success
     }, ex -> {
            // handle exception
     });
}

在 Service1 上,我定义了 AsyncOutboundGateway,例如:

@Bean
public IntegrationFlow requestSessionFlow(MessageChannel requestNewSessionChannel, 
                                          AsyncRabbitTemplate amqpTemplate,
                                          SessionProperties sessionProperties)
{
        return flow -> flow.channel(requestNewSessionChannel)
                .handle(Amqp.asyncOutboundGateway(amqpTemplate)
                        .exchangeName(sessionProperties.getRequestSession().getExchangeName())
                        .routingKey(sessionProperties.getRequestSession().getRoutingKey()));
    }

在 Service2 上,我有接收这些消息的流程:

@Bean
public IntegrationFlow requestNewSessionFlow(ConnectionFactory connectionFactory, 
                                             SessionProperties sessionProperties,
                                             MessageConverter messageConverter, 
                                             RequestNewSessionHandler requestNewSessionHandler)
{
        return IntegrationFlows.from(Amqp.inboundGateway(connectionFactory, 
                                sessionProperties.requestSessionProperties().queueName())
                .handle(requestNewSessionHandler)
                .get();

Service2 处理那里的请求:

@ServiceActivator(async = "true")
public ListenableFuture<SessionCreationResponseDTO> handleRequestNewSession()
{
    SettableListenableFuture<SessionCreationResponseDTO> settableListenableFuture = new SettableListenableFuture<>();

       // Goes through asynchronous process of creating session and sets value in listenable future

    return settableListenableFuture;
}

问题是 Service2 立即将 ListenableFuture 作为消息负载返回给 Service1,而不是等待未来的结果并发回结果。

如果我通过将 async 参数设置为 true 来正确理解文档文档@ServiceActivator,则应该返回成功的结果,如果出现异常,将使用错误通道。

可能我误解了文档,因此我需要在 Service2 流中解压缩 ListenableFuture,然后再将其作为响应返回,但我不确定如何实现。

我尝试了一些publishSubscribeChannel但没有太多运气的东西。

4

1 回答 1

1

你的问题在这里:

.handle(requestNewSessionHandler)

这样的配置看不到您@ServiceActivator(async = "true"),并将其用作常规阻塞服务激活器。

让我们看看这是否对您有帮助:

.handle(requestNewSessionHandler, "handleRequestNewSession", e -> e.async(true))

最好将其考虑为:或仅注释配置。或仅通过 Java DSL 编程。

于 2021-11-05T18:28:51.713 回答