0

I am setting up Camel with ActiveMQ and Atomikos - transacted. When throwing a RuntimeException in a processor, I am expecting ActiveMQ to retry the message, but the message is not returned to the queue? Please see my configuratio below.

<amq:broker id="my-broker" useJmx="true" persistent="false"
    brokerName="localhost" schedulerSupport="true">
    <amq:plugins>
        <amq:redeliveryPlugin fallbackToDeadLetter="true"
            sendToDlqIfMaxRetriesExceeded="true">
            <amq:redeliveryPolicyMap>
                <amq:redeliveryPolicyMap>
                    <!-- <amq:redeliveryPolicyEntries> <amq:redeliveryPolicy queue="SpecialQueue" 
                        maximumRedeliveries="4" redeliveryDelay="10000" /> </amq:redeliveryPolicyEntries> -->
                    <!-- the fallback policy for all other destinations -->
                    <amq:defaultEntry>
                        <amq:redeliveryPolicy maximumRedeliveries="4"
                            initialRedeliveryDelay="1000" redeliveryDelay="2000" />
                    </amq:defaultEntry>
                </amq:redeliveryPolicyMap>
            </amq:redeliveryPolicyMap>
        </amq:redeliveryPlugin>
    </amq:plugins>
    <amq:transportConnectors>
        <amq:transportConnector uri="tcp://localhost:61616" />
    </amq:transportConnectors>
</amq:broker>

<!-- use the XA-specific version of the connection factory -->
<bean id="amq.connectionFactory" class="org.apache.activemq.ActiveMQXAConnectionFactory"
    depends-on="my-broker">
    <property name="brokerURL" value="tcp://localhost:61616" />
</bean>
<bean id="xa.connectionFactory" class="com.atomikos.jms.AtomikosConnectionFactoryBean">
    <property name="uniqueResourceName" value="amq1" />
    <property name="xaConnectionFactory" ref="amq.connectionFactory" />
</bean>

<bean id="jta.transactionManager"
    class="org.springframework.transaction.jta.JtaTransactionManager">
    <property name="transactionManager">
        <bean class="com.atomikos.icatch.jta.UserTransactionManager"
            init-method="init" destroy-method="close">
            <property name="forceShutdown" value="false" />
        </bean>
    </property>
    <property name="userTransaction">
        <bean class="com.atomikos.icatch.jta.UserTransactionImp">
            <property name="transactionTimeout" value="300" />
        </bean>
    </property>
</bean>
<bean id="jmsConfig" class="org.apache.camel.component.jms.JmsConfiguration">
    <property name="connectionFactory" ref="amq.connectionFactory" />
    <property name="transacted" value="true" />
    <property name="transactionManager" ref="jta.transactionManager" />
</bean>

<bean id="jms" class="org.apache.activemq.camel.component.ActiveMQComponent">
    <property name="configuration" ref="jmsConfig" />
</bean>
<bean id="service1" class="com.soelvar.camel.processor.impl.Service1Impl" />

<camelContext xmlns="http://camel.apache.org/schema/spring">
    <camel:onException>
        <camel:exception>java.lang.Throwable</camel:exception>
        <camel:redeliveryPolicy redeliveryDelay="1000"
            maximumRedeliveries="3"></camel:redeliveryPolicy>
        <camel:handled>
            <constant>false</constant>
        </camel:handled>
        <!-- <camel:rollback markRollbackOnly="true" /> -->
    </camel:onException>

    <route id="orderProcessingQueue">
        <from uri="jms:orderProcessingQueue" />
        <to uri="bean:service1?method=process" />
    </route>
</camelContext>

The stacktrace I get is below. First Camel is retrying 3 times before a RuntimeException is thrown - the transaction is rolled back, but nothing on the Queue and no retries?

[Consumer[orderProcessingQueue]] TaskManager                    INFO  THREADS: using JDK thread pooling...
[hread #3 - JmsConsumer[orders]] BaseTransactionManager         INFO  createCompositeTransaction ( 300000 ): created new ROOT transaction with id 10.28.118.149.tm0000300117
[Consumer[orderProcessingQueue]] BaseTransactionManager         INFO  createCompositeTransaction ( 300000 ): created new ROOT transaction with id 10.28.118.149.tm0000100117
[ - JmsConsumer[incomingOrders]] BaseTransactionManager         INFO  createCompositeTransaction ( 300000 ): created new ROOT transaction with id 10.28.118.149.tm0000200117
null
com.soelvar.camel.processor.impl.Service1Impl
############################1##############################
[hread #3 - JmsConsumer[orders]] CompositeTransactionImp        INFO  commit() done (by application) of transaction 10.28.118.149.tm0000300117
[ - JmsConsumer[incomingOrders]] CompositeTransactionImp        INFO  commit() done (by application) of transaction 10.28.118.149.tm0000200117
[ - JmsConsumer[incomingOrders]] BaseTransactionManager         INFO  createCompositeTransaction ( 300000 ): created new ROOT transaction with id 10.28.118.149.tm0000400117
[hread #3 - JmsConsumer[orders]] BaseTransactionManager         INFO  createCompositeTransaction ( 300000 ): created new ROOT transaction with id 10.28.118.149.tm0000500117
com.soelvar.camel.processor.impl.Service1Impl
############################2##############################
[ - JmsConsumer[incomingOrders]] CompositeTransactionImp        INFO  commit() done (by application) of transaction 10.28.118.149.tm0000400117
[hread #3 - JmsConsumer[orders]] CompositeTransactionImp        INFO  commit() done (by application) of transaction 10.28.118.149.tm0000500117
[hread #3 - JmsConsumer[orders]] BaseTransactionManager         INFO  createCompositeTransaction ( 300000 ): created new ROOT transaction with id 10.28.118.149.tm0000600117
[ - JmsConsumer[incomingOrders]] BaseTransactionManager         INFO  createCompositeTransaction ( 300000 ): created new ROOT transaction with id 10.28.118.149.tm0000700117
com.soelvar.camel.processor.impl.Service1Impl
############################3##############################
com.soelvar.camel.processor.impl.Service1Impl
############################4##############################
[Consumer[orderProcessingQueue]] DefaultErrorHandler            ERROR Failed delivery for (MessageId: ID:WS340278-50162-1370387698217-5:4:1:1:1 on ExchangeId: ID-WS340278-50165-1370387698547-0-3). Exhausted after delivery attempt: 4 caught: java.lang.RuntimeException
java.lang.RuntimeException
    at com.soelvar.camel.processor.impl.Service1Impl.process(Service1Impl.java:22)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:39)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:25)
    at java.lang.reflect.Method.invoke(Method.java:597)
    at org.apache.camel.component.bean.MethodInfo.invoke(MethodInfo.java:391)
    at org.apache.camel.component.bean.MethodInfo$1.doProceed(MethodInfo.java:278)
    at org.apache.camel.component.bean.MethodInfo$1.proceed(MethodInfo.java:251)
    at org.apache.camel.component.bean.BeanProcessor.process(BeanProcessor.java:161)
    at org.apache.camel.util.AsyncProcessorHelper.process(AsyncProcessorHelper.java:99)
    at org.apache.camel.component.bean.BeanProcessor.process(BeanProcessor.java:67)
    at org.apache.camel.impl.ProcessorEndpoint.onExchange(ProcessorEndpoint.java:101)
    at org.apache.camel.impl.ProcessorEndpoint$1.process(ProcessorEndpoint.java:71)
    at org.apache.camel.util.AsyncProcessorConverterHelper$ProcessorToAsyncProcessorBridge.process(AsyncProcessorConverterHelper.java:61)
    at org.apache.camel.util.AsyncProcessorHelper.process(AsyncProcessorHelper.java:73)
    at org.apache.camel.processor.SendProcessor$2.doInAsyncProducer(SendProcessor.java:122)
    at org.apache.camel.impl.ProducerCache.doInAsyncProducer(ProducerCache.java:298)
    at org.apache.camel.processor.SendProcessor.process(SendProcessor.java:117)
    at org.apache.camel.util.AsyncProcessorHelper.process(AsyncProcessorHelper.java:73)
    at org.apache.camel.processor.DelegateAsyncProcessor.processNext(DelegateAsyncProcessor.java:99)
    at org.apache.camel.processor.DelegateAsyncProcessor.process(DelegateAsyncProcessor.java:90)
    at org.apache.camel.processor.interceptor.BacklogTracerInterceptor.process(BacklogTracerInterceptor.java:84)
    at org.apache.camel.util.AsyncProcessorHelper.process(AsyncProcessorHelper.java:73)
    at org.apache.camel.processor.DelegateAsyncProcessor.processNext(DelegateAsyncProcessor.java:99)
    at org.apache.camel.processor.DelegateAsyncProcessor.process(DelegateAsyncProcessor.java:90)
    at org.apache.camel.processor.interceptor.TraceInterceptor.process(TraceInterceptor.java:91)
    at org.apache.camel.util.AsyncProcessorHelper.process(AsyncProcessorHelper.java:73)
    at org.apache.camel.processor.RedeliveryErrorHandler.processErrorHandler(RedeliveryErrorHandler.java:390)
    at org.apache.camel.processor.RedeliveryErrorHandler.process(RedeliveryErrorHandler.java:273)
    at org.apache.camel.processor.RouteContextProcessor.processNext(RouteContextProcessor.java:46)
    at org.apache.camel.processor.DelegateAsyncProcessor.process(DelegateAsyncProcessor.java:90)
    at org.apache.camel.processor.interceptor.DefaultChannel.process(DefaultChannel.java:335)
    at org.apache.camel.processor.RouteContextProcessor.processNext(RouteContextProcessor.java:46)
    at org.apache.camel.processor.DelegateAsyncProcessor.process(DelegateAsyncProcessor.java:90)
    at org.apache.camel.processor.UnitOfWorkProcessor.processAsync(UnitOfWorkProcessor.java:150)
    at org.apache.camel.processor.UnitOfWorkProcessor.process(UnitOfWorkProcessor.java:117)
    at org.apache.camel.processor.RouteInflightRepositoryProcessor.processNext(RouteInflightRepositoryProcessor.java:48)
    at org.apache.camel.processor.DelegateAsyncProcessor.process(DelegateAsyncProcessor.java:90)
    at org.apache.camel.util.AsyncProcessorHelper.process(AsyncProcessorHelper.java:73)
    at org.apache.camel.processor.DelegateAsyncProcessor.processNext(DelegateAsyncProcessor.java:99)
    at org.apache.camel.processor.DelegateAsyncProcessor.process(DelegateAsyncProcessor.java:90)
    at org.apache.camel.management.InstrumentationProcessor.process(InstrumentationProcessor.java:72)
    at org.apache.camel.util.AsyncProcessorHelper.process(AsyncProcessorHelper.java:99)
    at org.apache.camel.processor.DelegateAsyncProcessor.process(DelegateAsyncProcessor.java:86)
    at org.apache.camel.component.jms.EndpointMessageListener.onMessage(EndpointMessageListener.java:104)
    at org.springframework.jms.listener.AbstractMessageListenerContainer.doInvokeListener(AbstractMessageListenerContainer.java:562)
    at org.springframework.jms.listener.AbstractMessageListenerContainer.invokeListener(AbstractMessageListenerContainer.java:500)
    at org.springframework.jms.listener.AbstractMessageListenerContainer.doExecuteListener(AbstractMessageListenerContainer.java:468)
    at org.springframework.jms.listener.AbstractPollingMessageListenerContainer.doReceiveAndExecute(AbstractPollingMessageListenerContainer.java:326)
    at org.springframework.jms.listener.AbstractPollingMessageListenerContainer.receiveAndExecute(AbstractPollingMessageListenerContainer.java:244)
    at org.springframework.jms.listener.DefaultMessageListenerContainer$AsyncMessageListenerInvoker.invokeListener(DefaultMessageListenerContainer.java:1069)
    at org.springframework.jms.listener.DefaultMessageListenerContainer$AsyncMessageListenerInvoker.executeOngoingLoop(DefaultMessageListenerContainer.java:1061)
    at org.springframework.jms.listener.DefaultMessageListenerContainer$AsyncMessageListenerInvoker.run(DefaultMessageListenerContainer.java:958)
    at java.util.concurrent.ThreadPoolExecutor$Worker.runTask(ThreadPoolExecutor.java:886)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:908)
    at java.lang.Thread.run(Thread.java:662)
[Consumer[orderProcessingQueue]] EndpointMessageListener        WARN  Execution of JMS message listener failed. Caused by: [org.apache.camel.RuntimeCamelException - java.lang.RuntimeException]
org.apache.camel.RuntimeCamelException: java.lang.RuntimeException
    at org.apache.camel.util.ObjectHelper.wrapRuntimeCamelException(ObjectHelper.java:1338)
    at org.apache.camel.component.jms.EndpointMessageListener$EndpointMessageListenerAsyncCallback.done(EndpointMessageListener.java:187)
    at org.apache.camel.component.jms.EndpointMessageListener.onMessage(EndpointMessageListener.java:108)
    at org.springframework.jms.listener.AbstractMessageListenerContainer.doInvokeListener(AbstractMessageListenerContainer.java:562)
    at org.springframework.jms.listener.AbstractMessageListenerContainer.invokeListener(AbstractMessageListenerContainer.java:500)
    at org.springframework.jms.listener.AbstractMessageListenerContainer.doExecuteListener(AbstractMessageListenerContainer.java:468)
    at org.springframework.jms.listener.AbstractPollingMessageListenerContainer.doReceiveAndExecute(AbstractPollingMessageListenerContainer.java:326)
    at org.springframework.jms.listener.AbstractPollingMessageListenerContainer.receiveAndExecute(AbstractPollingMessageListenerContainer.java:244)
    at org.springframework.jms.listener.DefaultMessageListenerContainer$AsyncMessageListenerInvoker.invokeListener(DefaultMessageListenerContainer.java:1069)
    at org.springframework.jms.listener.DefaultMessageListenerContainer$AsyncMessageListenerInvoker.executeOngoingLoop(DefaultMessageListenerContainer.java:1061)
    at org.springframework.jms.listener.DefaultMessageListenerContainer$AsyncMessageListenerInvoker.run(DefaultMessageListenerContainer.java:958)
    at java.util.concurrent.ThreadPoolExecutor$Worker.runTask(ThreadPoolExecutor.java:886)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:908)
    at java.lang.Thread.run(Thread.java:662)
Caused by: java.lang.RuntimeException
    at com.soelvar.camel.processor.impl.Service1Impl.process(Service1Impl.java:22)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:39)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:25)
    at java.lang.reflect.Method.invoke(Method.java:597)
    at org.apache.camel.component.bean.MethodInfo.invoke(MethodInfo.java:391)
    at org.apache.camel.component.bean.MethodInfo$1.doProceed(MethodInfo.java:278)
    at org.apache.camel.component.bean.MethodInfo$1.proceed(MethodInfo.java:251)
    at org.apache.camel.component.bean.BeanProcessor.process(BeanProcessor.java:161)
    at org.apache.camel.util.AsyncProcessorHelper.process(AsyncProcessorHelper.java:99)
    at org.apache.camel.component.bean.BeanProcessor.process(BeanProcessor.java:67)
    at org.apache.camel.impl.ProcessorEndpoint.onExchange(ProcessorEndpoint.java:101)
    at org.apache.camel.impl.ProcessorEndpoint$1.process(ProcessorEndpoint.java:71)
    at org.apache.camel.util.AsyncProcessorConverterHelper$ProcessorToAsyncProcessorBridge.process(AsyncProcessorConverterHelper.java:61)
    at org.apache.camel.util.AsyncProcessorHelper.process(AsyncProcessorHelper.java:73)
    at org.apache.camel.processor.SendProcessor$2.doInAsyncProducer(SendProcessor.java:122)
    at org.apache.camel.impl.ProducerCache.doInAsyncProducer(ProducerCache.java:298)
    at org.apache.camel.processor.SendProcessor.process(SendProcessor.java:117)
    at org.apache.camel.util.AsyncProcessorHelper.process(AsyncProcessorHelper.java:73)
    at org.apache.camel.processor.DelegateAsyncProcessor.processNext(DelegateAsyncProcessor.java:99)
    at org.apache.camel.processor.DelegateAsyncProcessor.process(DelegateAsyncProcessor.java:90)
    at org.apache.camel.processor.interceptor.BacklogTracerInterceptor.process(BacklogTracerInterceptor.java:84)
    at org.apache.camel.util.AsyncProcessorHelper.process(AsyncProcessorHelper.java:73)
    at org.apache.camel.processor.DelegateAsyncProcessor.processNext(DelegateAsyncProcessor.java:99)
    at org.apache.camel.processor.DelegateAsyncProcessor.process(DelegateAsyncProcessor.java:90)
    at org.apache.camel.processor.interceptor.TraceInterceptor.process(TraceInterceptor.java:91)
    at org.apache.camel.util.AsyncProcessorHelper.process(AsyncProcessorHelper.java:73)
    at org.apache.camel.processor.RedeliveryErrorHandler.processErrorHandler(RedeliveryErrorHandler.java:390)
    at org.apache.camel.processor.RedeliveryErrorHandler.process(RedeliveryErrorHandler.java:273)
    at org.apache.camel.processor.RouteContextProcessor.processNext(RouteContextProcessor.java:46)
    at org.apache.camel.processor.DelegateAsyncProcessor.process(DelegateAsyncProcessor.java:90)
    at org.apache.camel.processor.interceptor.DefaultChannel.process(DefaultChannel.java:335)
    at org.apache.camel.processor.RouteContextProcessor.processNext(RouteContextProcessor.java:46)
    at org.apache.camel.processor.DelegateAsyncProcessor.process(DelegateAsyncProcessor.java:90)
    at org.apache.camel.processor.UnitOfWorkProcessor.processAsync(UnitOfWorkProcessor.java:150)
    at org.apache.camel.processor.UnitOfWorkProcessor.process(UnitOfWorkProcessor.java:117)
    at org.apache.camel.processor.RouteInflightRepositoryProcessor.processNext(RouteInflightRepositoryProcessor.java:48)
    at org.apache.camel.processor.DelegateAsyncProcessor.process(DelegateAsyncProcessor.java:90)
    at org.apache.camel.util.AsyncProcessorHelper.process(AsyncProcessorHelper.java:73)
    at org.apache.camel.processor.DelegateAsyncProcessor.processNext(DelegateAsyncProcessor.java:99)
    at org.apache.camel.processor.DelegateAsyncProcessor.process(DelegateAsyncProcessor.java:90)
    at org.apache.camel.management.InstrumentationProcessor.process(InstrumentationProcessor.java:72)
    at org.apache.camel.util.AsyncProcessorHelper.process(AsyncProcessorHelper.java:99)
    at org.apache.camel.processor.DelegateAsyncProcessor.process(DelegateAsyncProcessor.java:86)
    at org.apache.camel.component.jms.EndpointMessageListener.onMessage(EndpointMessageListener.java:104)
    ... 11 more
[Consumer[orderProcessingQueue]] CompositeTransactionImp        INFO  rollback() done of transaction 10.28.118.149.tm0000100117
[hread #3 - JmsConsumer[orders]] CompositeTransactionImp        INFO  commit() done (by application) of transaction 10.28.118.149.tm0000600117
[ - JmsConsumer[incomingOrders]] CompositeTransactionImp        INFO  commit() done (by application) of transaction 10.28.118.149.tm0000700117
[Consumer[orderProcessingQueue]] BaseTransactionManager         INFO  createCompositeTransaction ( 300000 ): created new ROOT transaction with id 10.28.118.149.tm0000800117
[hread #3 - JmsConsumer[orders]] BaseTransactionManager         INFO  createCompositeTransaction ( 300000 ): created new ROOT transaction with id 10.28.118.149.tm0000900117
[ - JmsConsumer[incomingOrders]] BaseTransactionManager         INFO  createCompositeTransaction ( 300000 ): created new ROOT transaction with id 10.28.118.149.tm0001000117
[Consumer[orderProcessingQueue]] CompositeTransactionImp        INFO  commit() done (by application) of transaction 10.28.118.149.tm0000800117
[Consumer[orderProcessingQueue]] BaseTransactionManager         INFO  createCompositeTransaction ( 300000 ): created new ROOT transaction with id 10.28.118.149.tm0001100117
[hread #3 - JmsConsumer[orders]] CompositeTransactionImp        INFO  commit() done (by application) of transaction 10.28.118.149.tm0000900117
[ - JmsConsumer[incomingOrders]] CompositeTransactionImp        INFO  commit() done (by application) of transaction 10.28.118.149.tm0001000117
[hread #3 - JmsConsumer[orders]] BaseTransactionManager         INFO  createCompositeTransaction ( 300000 ): created new ROOT transaction with id 10.28.118.149.tm0001200117
4

2 回答 2

0

我已经对我的配置进行了一些调试,并找出了配置的哪一部分导致没有 ActiveMQ 重试发生。

下面的 JmsConfiguration 不起作用。

<bean id="jmsConfig" class="org.apache.camel.component.jms.JmsConfiguration">
    <property name="connectionFactory" ref="amq.connectionFactory" />
    <property name="transacted" value="true" />
    <property name="transactionManager" ref="jta.transactionManager" />
</bean>

<bean id="jms" class="org.apache.activemq.camel.component.ActiveMQComponent">
    <property name="configuration" ref="jmsConfig" />
</bean>

但这确实。

<bean id="jms" class="org.apache.activemq.camel.component.ActiveMQComponent">
    <property name="transacted" value="true" />
    <property name="transactionManager" ref="jta.transactionManager" />
</bean>

不幸的是,我现在无法让 ActiveMQ 选择 redeliveryPlugin 配置,但我已经看到在这个问题上提出了一个缺陷。有谁知道解决方法?我相信这是特定于 org.apache.activemq.ActiveMQXAConnectionFactory 的。

干杯,杰斯珀

于 2013-06-06T06:04:47.387 回答
0

请参阅此页面关于交易:http ://camel.apache.org/transactional-client.html

如果你有一本 Camel in Action 这本书,那么第 9 章就是关于交易的。

您需要在路由中添加 < transcted /> 以便 Camel 使用 TX 管理器。

于 2013-06-05T08:53:40.657 回答