1

我在 Jboss EAP 6.1 中使用默认的消息传递子系统我已经在我的组件中实现了事务。在地址设置配置中,我指定了以下配置。地址设置配置:

 <address-setting match="jms.queue.Src.in">
                        <dead-letter-address>jms.queue.DLQ</dead-letter-address>
                        <expiry-address>jms.queue.ExpiryQueue</expiry-address>
                        <redelivery-delay>5000</redelivery-delay>
                        <max-delivery-attempts>4</max-delivery-attempts>
 </address-setting>

我在 switchyard 组件中指定了全局事务,当发生异常时,事务被回滚,重试后,消息被放置在死信队列中。与此同时,我需要构建错误消息并将其放置在错误队列中供消费者使用选择它。消息将进入错误队列,但由于出现异常,事务未提交。

开关场 XML:

<?xml version="1.0" encoding="UTF-8"?>
<sy:switchyard xmlns:camel="urn:switchyard-component-camel:config:1.1" xmlns:jms="urn:switchyard-component-camel-jms:config:1.1" xmlns:sca="http://docs.oasis-open.org/ns/opencsa/sca/200912" xmlns:sy="urn:switchyard-config:switchyard:1.1" name="JmsToSca" targetNamespace="urn:com.company.esb.prototypes.consumer.async:JmsToSca:1.0">
  <sca:composite name="JmsToSca" targetNamespace="urn:com.company.esb.prototypes.consumer.async:JmsToSca:1.0">
    <sca:component name="RequestJmsToSca">
      <camel:implementation.camel requires="managedTransaction.Global">
        <camel:xml path="route/camelroutesend.xml"/>
      </camel:implementation.camel>
      <sca:service name="RequestInService" requires="propagatesTransaction">
        <sca:interface.java interface="com.company.esb.common.services.StringMessageService"/>
      </sca:service>
      <sca:reference name="RequestOutService" requires="propagatesTransaction">
        <sca:interface.java interface="com.company.esb.common.services.esbMessageService"/>
      </sca:reference>
      <sca:reference name="ResponseInService" requires="propagatesTransaction">
        <sca:interface.java interface="com.company.esb.common.services.esbMessageService"/>
      </sca:reference>
      <sca:reference name="AuditLoggerService" requires="suspendsTransaction">
        <sca:interface.java interface="com.company.esb.common.services.AuditLoggerService"/>
      </sca:reference>
      <sca:reference name="ExceptionHandlerService" requires="propagatesTransaction">
        <sca:interface.java interface="com.company.esb.common.services.ExceptionHandlerService"/>
      </sca:reference>
    </sca:component>
    <sca:service name="RequestInService" promote="RequestJmsToSca/RequestInService">
      <sca:interface.java interface="com.company.esb.common.services.StringMessageService"/>
      <jms:binding.jms name="jms1">
        <jms:contextMapper includes="JMSMessageID.*,JMSXDeliveryCount.*"/>
        <jms:queue>${Consumer.Async.JmsToSca.RequestQueueName}</jms:queue>
        <jms:connectionFactory>${Consumer.Async.JmsToSca.ConnectionFactoryname}</jms:connectionFactory>
        <jms:disableReplyTo>true</jms:disableReplyTo>
        <jms:transacted>true</jms:transacted>
        <jms:transactionManager>#jtaTransactionManager</jms:transactionManager>
      </jms:binding.jms>
    </sca:service>
    <sca:reference name="RequestOutService" multiplicity="0..1" promote="RequestJmsToSca/RequestOutService">
      <sca:interface.java interface="com.company.esb.common.services.esbMessageService"/>
      <sca:binding.sca sy:target="RequestInService" sy:targetNamespace="urn:com.company.esb.prototypes.consumer.async:AsyncScaToJms:1.0" name="sca1"/>
    </sca:reference>
    <sca:reference name="AuditLoggerService" multiplicity="0..1" promote="RequestJmsToSca/AuditLoggerService">
      <sca:interface.java interface="com.company.esb.common.services.AuditLoggerService"/>
      <sca:binding.sca sy:target="AuditLoggerService" sy:targetNamespace="urn:com.company.esb.components:AuditLogger:1.0" name="ScaAudit"/>
    </sca:reference>
    <sca:reference name="ExceptionHandlerService" multiplicity="0..1" promote="RequestJmsToSca/ExceptionHandlerService">
      <sca:interface.java interface="com.company.esb.common.services.ExceptionHandlerService"/>
      <sca:binding.sca sy:target="ExceptionHandlerService" sy:targetNamespace="urn:com.company.esb.components:ExceptionHandler:1.0" name="ScaExceptionHandler"/>
    </sca:reference>
    <sca:component name="ResponseScaToJms">
      <camel:implementation.camel requires="managedTransaction.Global">
        <camel:xml path="route/camelroutereceive.xml"/>
      </camel:implementation.camel>
      <sca:service name="ResponseInService" requires="propagatesTransaction">
        <sca:interface.java interface="com.company.esb.common.services.esbMessageService"/>
      </sca:service>
      <sca:reference name="AuditLoggerService" requires="suspendsTransaction">
        <sca:interface.java interface="com.company.esb.common.services.AuditLoggerService"/>
      </sca:reference>
      <sca:reference name="ExceptionHandlerService" requires="propagatesTransaction">
        <sca:interface.java interface="com.company.esb.common.services.ExceptionHandlerService"/>
      </sca:reference>
      <sca:reference name="ResponseOutService" requires="propagatesTransaction">
        <sca:interface.java interface="com.company.esb.common.services.StringMessageService"/>
      </sca:reference>
    </sca:component>
    <sca:service name="ResponseInService" promote="ResponseScaToJms/ResponseInService">
      <sca:interface.java interface="com.company.esb.common.services.esbMessageService"/>
      <sca:binding.sca name="sca1"/>
    </sca:service>
    <sca:reference name="ResponseOutService" multiplicity="0..1" promote="ResponseScaToJms/ResponseOutService">
      <sca:interface.java interface="com.company.esb.common.services.StringMessageService"/>
      <jms:binding.jms name="jms1">
        <jms:contextMapper excludes="JMSXDeliveryCount.*" includes="JMSCorrelationID.*"/>
        <jms:queue>${Consumer.Async.JmsToSca.ResponseQueueName}</jms:queue>
        <jms:connectionFactory>${Consumer.Async.JmsToSca.ConnectionFactoryname}</jms:connectionFactory>
        <jms:transacted>false</jms:transacted>
      </jms:binding.jms>
    </sca:reference>
  </sca:composite>
  <sy:domain>
    <sy:properties>
      <sy:property name="org.switchyard.handlers.messageTrace.enabled" value="true"/>
      <sy:property name="org.switchyard.propagateExceptionOnInOnly" value="true"/>
    </sy:properties>
  </sy:domain>
</sy:switchyard>

骆驼路线(camelroutesend.xml):

<?xml version="1.0" encoding="ASCII"?>
<routes xmlns="http://camel.apache.org/schema/spring">
    <route id="ConsumerAsyncJmsToScaRoute" streamCache="true">
        <from uri="switchyard://RequestInService" />
        <transacted ref="transactionPolicy" />
        <doTry>
            <bean ref="MessageGenerator" method="createesbMessage()" />
            <removeHeader headerName="JMSMessageID" />
            <bean ref="esbHelper" method="prepareAuditCall()" />
            <wireTap uri="switchyard://AuditLoggerService" />
            <setProperty propertyName="OriginalesbMessage">
                <simple>${body}</simple>
            </setProperty>

            <!-- Step3- invokes next pattern -->
            <to uri="switchyard://RequestOutService" />
            <doCatch>
                <!-- Step7- captures error in the route and invokes esbErrorHandler -->
                <exception>java.lang.Exception</exception>
                <handled>
                    <constant>true</constant>
                </handled>
                <log message="called Exception block ConsumerAsyncJmsToScaRoute ${body}"/>
                <bean ref="esbHelper" method="prepareErrorHandlerCall(false)" />
                <to uri="switchyard://ExceptionHandlerService" />
                <to uri="switchyard://ResponseInService" />
                <!-- Step8- Prepares response message in case of error -->
            </doCatch>
        </doTry>
    </route>
</routes>

骆驼路线(camelroutereceive.xml):

<?xml version="1.0" encoding="ASCII"?>
<routes xmlns="http://camel.apache.org/schema/spring">
    <route id="ConsumerAsyncScaToJmsRoute" streamCache="true">
        <from uri="switchyard://ResponseInService" />
        <transacted ref="transactionPolicy" />
        <doTry>
            <bean ref="esbHelper" method="prepareAuditCall()" />
            <wireTap uri="switchyard://AuditLoggerService" />
            <setProperty propertyName="OriginalesbMessage">
                <simple>${body}</simple>
            </setProperty>

            <to uri="xslt:xslt/create-response.xslt" />

            <bean ref="MessageGenerator" method="setCorrelationID()" />
            <!-- Step3- invokes Provider service -->
            <!--<to uri="switchyard://ResponseOutService" /> -->
            <to
                uri="jms:queue:{{Consumer.Async.JmsToSca.ResponseQueueName}}?connectionFactory={{Consumer.Async.JmsToSca.ConnectionFactoryname}}&amp;transacted=true"
                pattern="InOnly" />

            <doCatch>
                <!-- Step7- captures error in the route and invokes esbErrorHandler -->
                <exception>java.lang.Exception</exception>
                <handled>
                    <constant>true</constant>
                </handled>
                <bean ref="esbHelper" method="prepareErrorHandlerCall(false)" />
                <to uri="switchyard://ExceptionHandlerService" />
                <!-- Step8- Prepares response message in case of error -->
            </doCatch>
        </doTry>
    </route>
</routes>
4

0 回答 0