我在 Jboss EAP 6.1 中使用默认的消息传递子系统我已经在我的组件中实现了事务。在地址设置配置中,我指定了以下配置。地址设置配置:
<address-setting match="jms.queue.Src.in">
<dead-letter-address>jms.queue.DLQ</dead-letter-address>
<expiry-address>jms.queue.ExpiryQueue</expiry-address>
<redelivery-delay>5000</redelivery-delay>
<max-delivery-attempts>4</max-delivery-attempts>
</address-setting>
我在 switchyard 组件中指定了全局事务,当发生异常时,事务被回滚,重试后,消息被放置在死信队列中。与此同时,我需要构建错误消息并将其放置在错误队列中供消费者使用选择它。消息将进入错误队列,但由于出现异常,事务未提交。
开关场 XML:
<?xml version="1.0" encoding="UTF-8"?>
<sy:switchyard xmlns:camel="urn:switchyard-component-camel:config:1.1" xmlns:jms="urn:switchyard-component-camel-jms:config:1.1" xmlns:sca="http://docs.oasis-open.org/ns/opencsa/sca/200912" xmlns:sy="urn:switchyard-config:switchyard:1.1" name="JmsToSca" targetNamespace="urn:com.company.esb.prototypes.consumer.async:JmsToSca:1.0">
<sca:composite name="JmsToSca" targetNamespace="urn:com.company.esb.prototypes.consumer.async:JmsToSca:1.0">
<sca:component name="RequestJmsToSca">
<camel:implementation.camel requires="managedTransaction.Global">
<camel:xml path="route/camelroutesend.xml"/>
</camel:implementation.camel>
<sca:service name="RequestInService" requires="propagatesTransaction">
<sca:interface.java interface="com.company.esb.common.services.StringMessageService"/>
</sca:service>
<sca:reference name="RequestOutService" requires="propagatesTransaction">
<sca:interface.java interface="com.company.esb.common.services.esbMessageService"/>
</sca:reference>
<sca:reference name="ResponseInService" requires="propagatesTransaction">
<sca:interface.java interface="com.company.esb.common.services.esbMessageService"/>
</sca:reference>
<sca:reference name="AuditLoggerService" requires="suspendsTransaction">
<sca:interface.java interface="com.company.esb.common.services.AuditLoggerService"/>
</sca:reference>
<sca:reference name="ExceptionHandlerService" requires="propagatesTransaction">
<sca:interface.java interface="com.company.esb.common.services.ExceptionHandlerService"/>
</sca:reference>
</sca:component>
<sca:service name="RequestInService" promote="RequestJmsToSca/RequestInService">
<sca:interface.java interface="com.company.esb.common.services.StringMessageService"/>
<jms:binding.jms name="jms1">
<jms:contextMapper includes="JMSMessageID.*,JMSXDeliveryCount.*"/>
<jms:queue>${Consumer.Async.JmsToSca.RequestQueueName}</jms:queue>
<jms:connectionFactory>${Consumer.Async.JmsToSca.ConnectionFactoryname}</jms:connectionFactory>
<jms:disableReplyTo>true</jms:disableReplyTo>
<jms:transacted>true</jms:transacted>
<jms:transactionManager>#jtaTransactionManager</jms:transactionManager>
</jms:binding.jms>
</sca:service>
<sca:reference name="RequestOutService" multiplicity="0..1" promote="RequestJmsToSca/RequestOutService">
<sca:interface.java interface="com.company.esb.common.services.esbMessageService"/>
<sca:binding.sca sy:target="RequestInService" sy:targetNamespace="urn:com.company.esb.prototypes.consumer.async:AsyncScaToJms:1.0" name="sca1"/>
</sca:reference>
<sca:reference name="AuditLoggerService" multiplicity="0..1" promote="RequestJmsToSca/AuditLoggerService">
<sca:interface.java interface="com.company.esb.common.services.AuditLoggerService"/>
<sca:binding.sca sy:target="AuditLoggerService" sy:targetNamespace="urn:com.company.esb.components:AuditLogger:1.0" name="ScaAudit"/>
</sca:reference>
<sca:reference name="ExceptionHandlerService" multiplicity="0..1" promote="RequestJmsToSca/ExceptionHandlerService">
<sca:interface.java interface="com.company.esb.common.services.ExceptionHandlerService"/>
<sca:binding.sca sy:target="ExceptionHandlerService" sy:targetNamespace="urn:com.company.esb.components:ExceptionHandler:1.0" name="ScaExceptionHandler"/>
</sca:reference>
<sca:component name="ResponseScaToJms">
<camel:implementation.camel requires="managedTransaction.Global">
<camel:xml path="route/camelroutereceive.xml"/>
</camel:implementation.camel>
<sca:service name="ResponseInService" requires="propagatesTransaction">
<sca:interface.java interface="com.company.esb.common.services.esbMessageService"/>
</sca:service>
<sca:reference name="AuditLoggerService" requires="suspendsTransaction">
<sca:interface.java interface="com.company.esb.common.services.AuditLoggerService"/>
</sca:reference>
<sca:reference name="ExceptionHandlerService" requires="propagatesTransaction">
<sca:interface.java interface="com.company.esb.common.services.ExceptionHandlerService"/>
</sca:reference>
<sca:reference name="ResponseOutService" requires="propagatesTransaction">
<sca:interface.java interface="com.company.esb.common.services.StringMessageService"/>
</sca:reference>
</sca:component>
<sca:service name="ResponseInService" promote="ResponseScaToJms/ResponseInService">
<sca:interface.java interface="com.company.esb.common.services.esbMessageService"/>
<sca:binding.sca name="sca1"/>
</sca:service>
<sca:reference name="ResponseOutService" multiplicity="0..1" promote="ResponseScaToJms/ResponseOutService">
<sca:interface.java interface="com.company.esb.common.services.StringMessageService"/>
<jms:binding.jms name="jms1">
<jms:contextMapper excludes="JMSXDeliveryCount.*" includes="JMSCorrelationID.*"/>
<jms:queue>${Consumer.Async.JmsToSca.ResponseQueueName}</jms:queue>
<jms:connectionFactory>${Consumer.Async.JmsToSca.ConnectionFactoryname}</jms:connectionFactory>
<jms:transacted>false</jms:transacted>
</jms:binding.jms>
</sca:reference>
</sca:composite>
<sy:domain>
<sy:properties>
<sy:property name="org.switchyard.handlers.messageTrace.enabled" value="true"/>
<sy:property name="org.switchyard.propagateExceptionOnInOnly" value="true"/>
</sy:properties>
</sy:domain>
</sy:switchyard>
骆驼路线(camelroutesend.xml):
<?xml version="1.0" encoding="ASCII"?>
<routes xmlns="http://camel.apache.org/schema/spring">
<route id="ConsumerAsyncJmsToScaRoute" streamCache="true">
<from uri="switchyard://RequestInService" />
<transacted ref="transactionPolicy" />
<doTry>
<bean ref="MessageGenerator" method="createesbMessage()" />
<removeHeader headerName="JMSMessageID" />
<bean ref="esbHelper" method="prepareAuditCall()" />
<wireTap uri="switchyard://AuditLoggerService" />
<setProperty propertyName="OriginalesbMessage">
<simple>${body}</simple>
</setProperty>
<!-- Step3- invokes next pattern -->
<to uri="switchyard://RequestOutService" />
<doCatch>
<!-- Step7- captures error in the route and invokes esbErrorHandler -->
<exception>java.lang.Exception</exception>
<handled>
<constant>true</constant>
</handled>
<log message="called Exception block ConsumerAsyncJmsToScaRoute ${body}"/>
<bean ref="esbHelper" method="prepareErrorHandlerCall(false)" />
<to uri="switchyard://ExceptionHandlerService" />
<to uri="switchyard://ResponseInService" />
<!-- Step8- Prepares response message in case of error -->
</doCatch>
</doTry>
</route>
</routes>
骆驼路线(camelroutereceive.xml):
<?xml version="1.0" encoding="ASCII"?>
<routes xmlns="http://camel.apache.org/schema/spring">
<route id="ConsumerAsyncScaToJmsRoute" streamCache="true">
<from uri="switchyard://ResponseInService" />
<transacted ref="transactionPolicy" />
<doTry>
<bean ref="esbHelper" method="prepareAuditCall()" />
<wireTap uri="switchyard://AuditLoggerService" />
<setProperty propertyName="OriginalesbMessage">
<simple>${body}</simple>
</setProperty>
<to uri="xslt:xslt/create-response.xslt" />
<bean ref="MessageGenerator" method="setCorrelationID()" />
<!-- Step3- invokes Provider service -->
<!--<to uri="switchyard://ResponseOutService" /> -->
<to
uri="jms:queue:{{Consumer.Async.JmsToSca.ResponseQueueName}}?connectionFactory={{Consumer.Async.JmsToSca.ConnectionFactoryname}}&transacted=true"
pattern="InOnly" />
<doCatch>
<!-- Step7- captures error in the route and invokes esbErrorHandler -->
<exception>java.lang.Exception</exception>
<handled>
<constant>true</constant>
</handled>
<bean ref="esbHelper" method="prepareErrorHandlerCall(false)" />
<to uri="switchyard://ExceptionHandlerService" />
<!-- Step8- Prepares response message in case of error -->
</doCatch>
</doTry>
</route>
</routes>