1

我正在尝试使用 Spring Integration v4 的 DSL API 编写一个简单的消息流,如下所示:

       -> in.ch -> Processing -> JmsGatewayOut -> JMS_OUT_QUEUE
Gateway
       <- out.ch <- Processing <- JmsGatewayIn <- JMS_IN_QUEUE

由于请求/响应是异步的,当我通过初始网关注入消息时,消息会一直传递到 JMS_OUT_QUEUE。在此消息流之外,回复消息被放回 JMS_IN_QUEUE,然后由 JmsGatewayIn 拾取。此时,消息被处理并放入 out.ch (我知道响应到达 out.ch,因为我在那里有一个记录器拦截器记录放置在那里的消息)但是,网关永远不会收到响应。

此消息流之外的系统从 JMS_OUT_QUEUE 获取消息并将响应放置在 JMS_IN_QUEUE 中,而不是响应,而是接收javax.jms.MessageFormatException: MQJMS1061: Unable to deserialize object在自己的 JmsOutboundgateway 上接收到一个(我认为它无法通过查看日志来反序列化 jms 回复对象) .

我显然没有正确配置某些东西,但我不知道到底是什么。有谁知道我错过了什么?

使用 spring-integration-core-4.0.3.RELEASE、spring-integration-jms-4.0.3.RELEASE、spring-integration-java-dsl-1.0.0.M2、spring-jms-4.0.6.RELEASE。

我的网关配置如下:

@MessagingGateway
public interface WsGateway {

    @Gateway(requestChannel = "in.ch", replyChannel = "out.ch", 
        replyTimeout = 45000)
    AResponse process(ARequest request);
}

我的集成流程配置如下:

@Configuration
@EnableIntegration
@IntegrationComponentScan
@ComponentScan
public class IntegrationConfig {

    @Bean(name = "in.ch")
    public DirectChannel inCh() {
        return new DirectChannel();
    }

    @Bean(name = "out.ch")
    public DirectChannel outCh() {
        return new DirectChannel();
    }   

    @Autowired
    private MQQueueConnectionFactory mqConnectionFactory;

    @Bean
    public IntegrationFlow requestFlow() {

        return IntegrationFlows.from("in.ch")
                .handle("processor", "processARequest")
                .handle(Jms.outboundGateway(mqConnectionFactory)
                        .requestDestination("JMS_OUT_QUEUE")
                        .correlationKey("JMSCorrelationID")
                .get();
    }

    @Bean
    public IntegrationFlow responseFlow() {

        return IntegrationFlows.from(Jms.inboundGateway(mqConnectionFactory)
                .destination("JMS_IN_QUEUE"))
                .handle("processor", "processAResponse")
                .channel("out.ch")
                .get();
    }
}

感谢您对此的任何帮助,PM。

4

1 回答 1

2

首先你的配置不好:

  1. 由于您从您开始流程,WsGateway#process您真的应该在那里等待回复。网关的请求/回复能力基于TemporaryReplyChannel,它被放置到headersas non-serializable 值。

  2. 只要您等待依赖该网关,实际上没有理由提供replyChannel, 如果您不打算对回复执行一些发布-订阅逻辑。

  3. 当您向 JMS 队列发送消息时,您应该了解消费者部分可能是一个单独的远程应用程序。最后一个可能对您的out.ch.

  4. JMS 请求/回复能力确实基于JMSCorrelationID,但还不够。这里还有一件事是ReplyToJMS 标头。因此,如果您要从消费者那里发送回复,您真的应该只依赖这些JmsGatewayIn东西。

所以我会把你的代码改成这样:

@MessagingGateway
public interface WsGateway {

    @Gateway(requestChannel = "in.ch", replyTimeout = 45000)
    AResponse process(ARequest request);
}

@Configuration
@EnableIntegration
@IntegrationComponentScan
@ComponentScan
public class IntegrationConfig {

    @Bean(name = "in.ch")
    public DirectChannel inCh() {
        return new DirectChannel();
    }

    @Autowired
    private MQQueueConnectionFactory mqConnectionFactory;

    @Bean
    public IntegrationFlow requestFlow() {
        return IntegrationFlows.from("in.ch")
                .handle("processor", "processARequest")
                .handle(Jms.outboundGateway(mqConnectionFactory)
                        .requestDestination("JMS_OUT_QUEUE")
                        .replyDestination("JMS_IN_QUEUE"))
                .handle("processor", "processAResponse")
                .get();
    }

}

让我知道,如果它适合您,或者尝试解释为什么您two-way在一种情况下使用网关one-way。也许Jms.outboundAdapter()Jms.inboundAdapter()你更有益?

更新

如何<header-channels-to-string>从 Java DSL 使用:

.enrichHeaders(e -> e.headerChannelsToString())
于 2014-07-16T12:50:11.383 回答