0

我正在将 Spring Batch 分区作业(XML 配置)从 Spring Batch 2.2.7 / Spring 3.2 迁移到 Spring Batch 4.0.2 / Spring 5.0.12。战争文件部署在带有 ActiveMQ Artemis 的 Wildfly 11 上。整体方法使用 x 个集群应用程序服务器并将分区作业划分为 y 个分区,每个服务器具有 y/x 个侦听器,因此负载在集群周围均匀分布。

我们在所有分区批处理作业中使用 1 个队列用于传出消息和 1 个队列用于传入消息。所有作业共享一个 JmsInboundGateway,例如:

<int-jms:inbound-gateway 
        id="springbatch.master.inbound.gateway" 
        connection-factory="springbatch.listener.jmsConnectionFactory" 
        request-channel="springbatch.slave.jms.request" 
        request-destination="springbatch.partition.jms.requestsQueue" 
        concurrent-consumers="${springbatch.partition.concurrent.consumers}" 
        max-concurrent-consumers="${springbatch.partition.concurrent.maxconsumers}" 
        max-messages-per-task="${springbatch.partition.concurrent.maxmessagespertask}"/>

    <int:service-activator 
        input-channel="springbatch.slave.jms.request" 
        output-channel="springbatch.slave.jms.response" 
        ref="springbatch.stepExecutionRequestHandler"/>   

每个作业都有一个出站网关,定义如下:

<int-jms:outbound-gateway 
    connection-factory="springbatch.jmsConnectionFactory" 
    request-channel="partitioned.jms.requests" 
    request-destination="partition.jms.requestsQueue" 
    reply-channel="partitioned.jms.reply" 
    reply-destination="partition.jms.repliesQueue"
    receive-timeout="partitioned.timeout}"
    correlation-key="JMSCorrelationID" >
    <int-jms:reply-listener cache-level="0" />        
</int-jms:outbound-gateway>

<int:aggregator 
    input-channel="partitioned.jms.reply" 
    ref="partitioned.jms.handler"/>

基于集成架构更改,我们从入站网关中删除了 JMSCorrelationId 和回复侦听器。

对于最初的集成工作,我只定义了入站网关,Wildfly 抛出了以下异常:

[org.springframework.jms.listener.DefaultMessageListenerContainer] (springbatch.master.inbound.gateway.container-2) Setup of JMS message listener invoker failed for destination 'ActiveMQQueue[partitionRequestQueue]' - trying to recover. Cause: Only allowed one session per connection. See the J2EE spec, e.g. J2EE1.4 Section 6.6: javax.jms.IllegalStateException: Only allowed one session per connection. See the J2EE spec, e.g. J2EE1.4 Section 6.6     
    at org.apache.activemq.artemis.ra.ActiveMQRASessionFactoryImpl.allocateConnection(ActiveMQRASessionFactoryImpl.java:817)    
    at org.apache.activemq.artemis.ra.ActiveMQRASessionFactoryImpl.createSession(ActiveMQRASessionFactoryImpl.java:531)     
    at org.springframework.jms.support.JmsAccessor.createSession(JmsAccessor.java:208)  
    at org.springframework.jms.listener.DefaultMessageListenerContainer.access$1500(DefaultMessageListenerContainer.java:125)

由于此错误,是否有不同的方法来定义侦听器的数量

Only allowed one session per connection. See the J2EE spec, e.g. J2EE1.4 Section 6.6

更新以下问题

当 Wildfly 启动时,我看到这条线

WFLYJCA0002: Bound JCA ConnectionFactory [java:/JmsXA]

我正在使用 java:/JmsXA

这是一个 Spring Boot 应用程序,它在定义的 5 个 JmsListener 的日志中正常工作。

2019-01-16 06:30:54,667 DEBUG DefaultMessageListenerContainer] (ServerService Thread Pool -- 67) Resumed paused task: org.springframework.jms.listener.DefaultMessageListenerContainer$AsyncMessageListenerInvoker@a877e28

当我添加 Jms-inbound-gateway 的定义时,我开始看到上面列出的错误。

模式问题

定义入站网关的 XML 具有以下架构定义

http://www.springframework.org/schema/integration http://www.springframework.org/schema/integration/spring-integration.xsd
http://www.springframework.org/schema/integration/jms http://www.springframework.org/schema/integration/jms/spring-integration-jms.xsd

之前的代码(批处理 2.2)具有以下架构定义:

http://www.springframework.org/schema/integration http://www.springframework.org/schema/integration/spring-integration-2.2.xsd
http://www.springframework.org/schema/integration/jms http://www.springframework.org/schema/integration/jms/spring-integration-jms-2.2.xsd

我刚刚更新了

http://www.springframework.org/schema/integration http://www.springframework.org/schema/integration/spring-integration-5.0.xsd
http://www.springframework.org/schema/integration/jms http://www.springframework.org/schema/integration/jms/spring-integration-jms-5.0.xsd

我可以重新添加 JMSCoordinationID 以解决该问题。

但是,当我包含 jms-inbound-gateway 时,服务器启动时仍然出现 with ActiveMQ 错误。

4

1 回答 1

0

为了解决 J2ee Spec 的问题,我需要将 cache-level=0 添加到入站网关。

<int-jms:inbound-gateway 
    id="springbatch.master.inbound.gateway" 
    connection-factory="springbatch.jmsConnectionFactory" 
    request-channel="springbatch.slave.jms.request" 
    request-destination="springbatch.partition.jms.requestsQueue" 
    reply-channel="springbatch.slave.jms.response" 
    concurrent-consumers="${springbatch.partition.concurrent.consumers}" 
    max-concurrent-consumers="${springbatch.partition.concurrent.maxconsumers}" 
    max-messages-per-task="${springbatch.partition.concurrent.maxmessagespertask}"
    reply-time-to-live="${springbatch.partition.reply.time.to.live}"  
    cache-level="0"
/>
于 2019-01-18T15:26:44.300 回答