2

想知道在使用 Atomikos + Camel + ActiveMQ 的组合时是否有人遇到过这个问题。我正在使用此组合以交易方式从队列中剥离消息。它似乎运作良好。问题是我现在处于需要在 ActiveMQ 中打开咨询消息的情况。在我这样做之后,我注意到所有队列都在不断地重新创建连接。ActiveMQ.Advisory.Consumer.Queue 主题的泛滥证明了这一点。在 DEBUG 日志中也很明显,因为它不断创建连接、打开事务、提交事务并关闭连接。这发生在没有任何实际应用程序生成的消息的情况下。所有其他非事务队列/主题都没有这个问题。我在其他几篇文章中读到了连接池和缓存可以缓解这个问题。看来我不应该使用缓存,而且我已经在连接池了。我正在使用这个配置:

<bean id="txq" class="org.apache.activemq.camel.component.ActiveMQComponent">
    <property name="configuration" ref="txJmsConfig" />
</bean>
<bean id="txJmsConfig" class="org.apache.camel.component.jms.JmsConfiguration">
    <property name="connectionFactory" ref="atomikosConnectionFactory" />
    <property name="concurrentConsumers" value="1" />
    <property name="transacted" value="true" />
    <property name="maxConcurrentConsumers" value="${consumers.concurrent.max}" />
    <property name="transactionManager" ref="jtaTransactionManager" />
    <property name="cacheLevelName" value="CACHE_NONE" />
</bean>
<bean id="atomikosConnectionFactory" class="com.atomikos.jms.AtomikosConnectionFactoryBean"
    init-method="init" destroy-method="close">
    <property name="uniqueResourceName">
        <value>XA-JMS-ATOMIKOS</value>
    </property>
    <property name="localTransactionMode">
        <value>false</value>
    </property>
    <property name="poolSize">
        <value>4</value>
    </property>
    <property name="xaConnectionFactory">
        <ref bean="xaJmsConnectionFactory" />
    </property>
</bean>
<bean id="xaJmsConnectionFactory" class="org.apache.activemq.ActiveMQXAConnectionFactory">
    <property name="brokerURL"
        value="${queue.address}?jms.watchTopicAdvisories=false&amp;jms.prefetchPolicy.all=0" />
</bean>
<bean id="userTransactionService" class="com.atomikos.icatch.config.UserTransactionServiceImp"
    init-method="init" destroy-method="shutdownForce">
    <constructor-arg>
        <props>
            <prop key="com.atomikos.icatch.service">
                com.atomikos.icatch.standalone.UserTransactionServiceFactory
            </prop>
            <prop key="com.atomikos.icatch.max_actives">${batch.transactions.concurrent.max}</prop>
        </props>
    </constructor-arg>
</bean>
<bean id="atomikosTransactionManager" class="com.atomikos.icatch.jta.UserTransactionManager"
    init-method="init" destroy-method="close" depends-on="userTransactionService">
    <property name="forceShutdown" value="false" />
</bean>
<bean id="atomikosUserTransaction" class="com.atomikos.icatch.jta.UserTransactionImp">
    <property name="transactionTimeout" value="300" />
</bean>
<bean id="jtaTransactionManager"
    class="org.springframework.transaction.jta.JtaTransactionManager">
    <property name="transactionManager" ref="atomikosTransactionManager" />
    <property name="userTransaction" ref="atomikosUserTransaction" />
</bean>

它使用了我认为实现池的 AtomikosConnectionFactoryBean。也许我错了?我很想知道是否有其他人和我一起在这艘船上,以及他们做了什么来修复它。

@PeterSmith 建议实施

彼得,谢谢你的建议。我将配置更改为使用 XaPooledConnectionFactory。春天对此并不满意。它认为 XaPooledConnectionFactory 没有实现 XAConnectionFactory。

<bean id="atomikosConnectionFactory" class="com.atomikos.jms.AtomikosConnectionFactoryBean"
    init-method="init" destroy-method="close" depends-on="xaJmsPooledConnectionFactory">
    <property name="uniqueResourceName">
        <value>XA-JMS-ATOMIKOS</value>
    </property>
    <property name="localTransactionMode">
        <value>false</value>
    </property>
    <property name="maxPoolSize">
        <value>32</value>
    </property>
    <property name="xaConnectionFactory">
        <ref bean="xaJmsPooledConnectionFactory" />
    </property>
</bean>
<bean id="xaJmsPooledConnectionFactory" class="org.apache.activemq.pool.XaPooledConnectionFactory"
     init-method="start" destroy-method="stop" depends-on="xaJmsConnectionFactory">
    <property name="maxConnections" value="2" />
    <property name="connectionFactory" ref="xaJmsConnectionFactory" />
</bean> 
<bean id="xaJmsConnectionFactory" class="org.apache.activemq.ActiveMQXAConnectionFactory">
    <property name="brokerURL"
        value="${queue.address}?jms.watchTopicAdvisories=false&amp;jms.prefetchPolicy.all=0" />
</bean>

java.lang.IllegalStateException:无法将类型 [org.apache.activemq.pool.XaPooledConnectionFactory] ​​的值转换为属性“xaConnectionFactory”所需的类型 [javax.jms.XAConnectionFactory]:找不到匹配的编辑器或转换策略

文档指出 XaPooledConnectionFactory 类实现了 javax.jms.XAConnectionFactory,所以我现在有点迷失了。看来这应该有效。

4

1 回答 1

1

使用上述组合,我也看到了这一点。

似乎是 Spring DMLC(它正在实现骆驼 jms 消费者)正在使用一个 consumer.receive() 循环来能够在 XA 事务中登记读取操作。

如果没有在 DMLC 中进行 XA 缓存,甚至没有使用不轮询的 SMLC,都是可能的。

尝试将您的 activemq CF 包装在 org.apache.activemq.pool.PooledConnectionFactory 中,看看是否有帮助。

于 2013-01-23T08:11:27.573 回答