我正在开发一个将 XML 作为输入的应用程序,然后使用 XSD 对其进行验证,如果文件有效,它将被移动到“File_IN_Queue”
第二个流程是从“File_IN_Queue”中获取文件并对数据库日志进行一些处理,然后所有文件将被移动到“File_OUT_Queue”。
第三个流程是从“File_OUT_Queue”中获取文件并使用文件出站将其放入某个文件夹
现在我要实现的是,如果此流程中的任何一个失败,则文件仍应保留在队列中,例如,如果第二个流程失败,则文件应保留在“File_IN_Queue”中,如果第三个流程失败,则文件应保留在“File_OUT_Queue”中
我正在使用 XA 事务 bcoz 在处理第二流中的文件时有许多其他队列出现在图片中,例如电子邮件队列异常队列 n all
所以我该怎么做?我应该如何实施?
现在,在第二个流程中,我在 file_IN_Queue 中将 xa 事务设置为“始终开始”,这是第二个流程的起点,在第三个流程中,再次在 File_OUT_Queue 中“始终开始”。并且处理中所有进入图片的队列都设置为“始终加入”
xml配置
<?xml version="1.0" encoding="UTF-8"?>
<mule xmlns:jms="http://www.mulesoft.org/schema/mule/jms"
xmlns:smtp="http://www.mulesoft.org/schema/mule/smtp" xmlns:smtps="http://www.mulesoft.org/schema/mule/smtps"
xmlns:scripting="http://www.mulesoft.org/schema/mule/scripting"
xmlns:mulexml="http://www.mulesoft.org/schema/mule/xml" xmlns:context="http://www.springframework.org/schema/context"
xmlns:http="http://www.mulesoft.org/schema/mule/http" xmlns:file="http://www.mulesoft.org/schema/mule/file"
xmlns="http://www.mulesoft.org/schema/mule/core" xmlns:doc="http://www.mulesoft.org/schema/mule/documentation"
xmlns:spring="http://www.springframework.org/schema/beans" version="EE-3.3.2"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:jbossts="http://www.mulesoft.org/schema/mule/jbossts"
xsi:schemaLocation="
http://www.mulesoft.org/schema/mule/http http://www.mulesoft.org/schema/mule/http/current/mule-http.xsd
http://www.mulesoft.org/schema/mule/xml http://www.mulesoft.org/schema/mule/xml/current/mule-xml.xsd
http://www.mulesoft.org/schema/mule/file http://www.mulesoft.org/schema/mule/file/current/mule-file.xsd
http://www.mulesoft.org/schema/mule/jms http://www.mulesoft.org/schema/mule/jms/current/mule-jms.xsd
http://www.mulesoft.org/schema/mule/smtps http://www.mulesoft.org/schema/mule/smtps/current/mule-smtps.xsd
http://www.mulesoft.org/schema/mule/smtp http://www.mulesoft.org/schema/mule/smtp/current/mule-smtp.xsd
http://www.mulesoft.org/schema/mule/scripting http://www.mulesoft.org/schema/mule/scripting/current/mule-scripting.xsd
http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-current.xsd
http://www.mulesoft.org/schema/mule/core http://www.mulesoft.org/schema/mule/core/current/mule.xsd
http://www.mulesoft.org/schema/mule/jbossts http://www.mulesoft.org/schema/mule/jbossts/current/mule-jbossts.xsd">
<spring:beans>
<!-- All database access object Beans -->
<spring:bean id="springApplicationContext" name="springApplicationContext"
class="com.util.SpringApplicationContext" />
<spring:bean id="checkPointDAO" name="checkPointDAO"
class="com.dao.impl.CheckPointDAO"
scope="prototype" />
<spring:bean id="buyerDAO" name="buyerDAO"
class="com.dao.impl.BuyerDAO" />
<spring:bean id="supplierDAO" name="supplierDAO"
class="com.dao.impl.SupplierDAO" />
<spring:bean id="locationDAO" name="locationDAO"
class="com.dao.impl.LocationDAO" />
<spring:bean id="shipToDetailDAO" name="shipToDetailDAO"
class="com.dao.impl.ShipToDetailDAO" />
<spring:bean id="contactDAO" name="contactDAO"
class="com.dao.impl.ContactDAO" />
<spring:bean id="exceptionDAO" name="exceptionDAO"
class="com.dao.impl.ExceptionDAO" />
<spring:bean id="messageDAO" name="messageDAO"
class="com.dao.impl.MessageDAO" />
<spring:bean id="addressDAO" name="addressDAO"
class="com.dao.impl.AddressDAO" />
<spring:bean id="xmlDAO" name="xmlDAO"
class="com.dao.impl.XmlDAO" />
<!-- All database access object Beans -->
<!-- Spring ORM hibernate sessionFactory Bean -->
<spring:bean id="sessionFactory"
class="org.springframework.orm.hibernate3.annotation.AnnotationSessionFactoryBean">
<spring:property name="packagesToScan" value="${hibernate.packagesToScan}" />
<spring:property name="eventListeners">
<spring:map>
<spring:entry key="pre-insert">
<spring:bean
class="com.listener.PreInsertOrUpdateEventListener"
id="preInsertOrUpdateEventListener" />
</spring:entry>
<spring:entry key="pre-update">
<spring:bean
class="com.listener.PreInsertOrUpdateEventListener"
id="preInsertOrUpdateEventListener" />
</spring:entry>
</spring:map>
</spring:property>
<spring:property name="hibernateProperties">
<spring:props>
<spring:prop key="hibernate.connection.driver_class">${db.driverClassName}</spring:prop>
<spring:prop key="hibernate.connection.url">${db.url}</spring:prop>
<spring:prop key="hibernate.connection.username">${db.username}</spring:prop>
<spring:prop key="hibernate.connection.password">${db.password}</spring:prop>
<spring:prop key="hibernate.dialect">${hibernate.dialect}</spring:prop>
<spring:prop key="hibernate.show_sql">${hibernate.show_sql}</spring:prop>
<!-- <spring:prop key="hibernate.hbm2ddl.auto">${hibernate.hbm2ddl.auto}
</spring:prop> -->
<spring:prop key="hibernate.temp.use_jdbc_metadata_defaults">${hibernate.temp.use_jdbc_metadata_defaults}
</spring:prop>
<spring:prop key="hibernate.connection.provider_class">
${hibernate.connection.provider_class}
</spring:prop>
<spring:prop key="hibernate.dbcp.initialSize">${hibernate.dbcp.initialSize}
</spring:prop>
<spring:prop key="hibernate.dbcp.maxActive">${hibernate.dbcp.maxActive}
</spring:prop>
<spring:prop key="hibernate.dbcp.maxIdle">${hibernate.dbcp.maxIdle}
</spring:prop>
<spring:prop key="hibernate.dbcp.minIdle">${hibernate.dbcp.minIdle}
</spring:prop>
</spring:props>
</spring:property>
</spring:bean>
<spring:bean id="propertyFileReader" name="propertyFileReader"
class="com.util.PropertyFileReader"
init-method="init" />
<spring:bean id="rboConfig" name="rboConfig"
class="com.rbo.bean.RboConfiguration">
<spring:property name="url" value="${url}"></spring:property>
<spring:property name="account" value="${account}"></spring:property>
<spring:property name="userId" value="${userId}"></spring:property>
<spring:property name="password" value="${password}"></spring:property>
</spring:bean>
<spring:bean id="rboBaseDAO" name="rboBaseDAO"
class="com.rbo.dao.RboBaseDAO">
<spring:property name="rboConfig" ref="rboConfig" />
</spring:bean>
<spring:bean id="rsInfoDAO" name="rsInfoDAO"
parent="rboBaseDAO" class="com.rbo.dao.RSDAO">
</spring:bean>
<spring:bean id="rsPartInfoDAO" name="rsPartInfoDAO"
parent="rboBaseDAO"
class="com.rbo.dao.RSPartInfoDAO">
</spring:bean>
<spring:bean id="symphonyInfoResourceComponent" name="symphonyInfoResourceComponent"
class="com.rbo.component.SymphonyInfoResourceComponent">
<spring:property name="dao" ref="rsInfoDAO" />
</spring:bean>
<spring:bean id="orderSplittingComponent" name="orderSplittingComponent"
class="com.component.OrderSplittingComponent">
</spring:bean>
<spring:bean id="symphonyPartInfoComponent" name="symphonyPartInfoComponent"
class="com.rbo.component.SymphonyPartInfoComponent">
<spring:property name="rsPartInfoDAO" ref="rsPartInfoDAO" />
</spring:bean>
<spring:bean id="redeliveryPolicy" class="org.apache.activemq.RedeliveryPolicy">
<spring:property name="maximumRedeliveries"
value="${jms.connector.redeliverypolicy.maximumredeliveries}" />
<spring:property name="initialRedeliveryDelay"
value="${jms.connector.redeliverypolicy.initialredeliverydelay}" />
<spring:property name="redeliveryDelay"
value="${jms.connector.redeliverypolicy.redeliverydelay}" />
</spring:bean>
<spring:bean id="amqFactory"
class="org.apache.activemq.ActiveMQXAConnectionFactory">
<spring:property name="redeliveryPolicy" ref="redeliveryPolicy">
</spring:property>
</spring:bean></spring:beans>
<choice-exception-strategy name="Choice_Exception_Strategy">
<catch-exception-strategy
when="#[exception.causedBy(${symphony.exception.strategy.class})]"
doc:name="Catch Exception Strategy">
<logger message="byte to object conersion #[message:payload]"
level="INFO" doc:name="Logger" />
<set-variable variableName="transactionCommand"
value="#[com.util.TransactionType.ROLLBACK]"
doc:name=" set transaction command" />
<component
class="com.component.TransactionComponent"
doc:name="Rollback transaction" />
<custom-transformer
class="com.transformer.FileNameTransformer"
doc:name="File Name Transformer" />
<component
class="com.component.ExceptionGeneratorComponent"
doc:name="Exception Generator Component" />
<set-payload value="#[sessionVars['originalXML']]"
doc:name="Set Payload" />
<choice doc:name="Choice">
<when expression="sessionVars['fileNamePrefix'] == '1$#^'">
<processor-chain>
<logger message="firstcondition executed" level="INFO"
doc:name="Logger" />
<jms:outbound-endpoint queue="exception_queue"
connector-ref="Exception-Email-Processing-JMS-Outbound-connector"
doc:name="JMS">
</jms:outbound-endpoint>
</processor-chain>
</when>
<when expression="sessionVars['fileNamePrefix'] == '2$#^'">
<processor-chain>
<logger message="secondcondition executed" level="INFO"
doc:name="Logger" />
<jms:outbound-endpoint queue="exception_queue"
connector-ref="Exception-Email-Processing-JMS-Outbound-connector"
doc:name="JMS">
</jms:outbound-endpoint>
</processor-chain>
</when>
<otherwise>
<processor-chain>
<logger message="otherwisecondition executed" level="INFO"
doc:name="Logger" />
<set-variable variableName="actionCommand"
value="#[com.util.SympEnum.PROCESSED_XML]"
doc:name=" set transaction command" />
<component
class="com.component.XmlLoggingComponent"
doc:name="Java" />
<jms:outbound-endpoint queue="file_out_queue"
connector-ref="File-Processing-JMS-Outbound-connector" doc:name="JMS">
</jms:outbound-endpoint>
<jms:outbound-endpoint queue="email_queue"
connector-ref="Exception-Email-Processing-JMS-Outbound-connector"
doc:name="JMS">
</jms:outbound-endpoint>
</processor-chain>
</otherwise>
</choice>
<component
class="com.component.ExceptionLoggingComponent"
doc:name="Exception Logging Component" />
</catch-exception-strategy>
<catch-exception-strategy
when="#[exception.causedBy(${symphony.exception.strategy.class.throwable})]">
<logger message="invalid condition executed" level="INFO"
doc:name="Logger" />
<set-variable variableName="actionCommand"
value="#[com.util.SympEnum.VALID_XML]"
doc:name=" set transaction command" />
<component
class="com.component.XmlLoggingComponent"
doc:name="Java" />
<choice doc:name="Choice">
<when expression="sessionVars['isValid'] == true">
<processor-chain>
<!--processing-->
</processor-chain>
</when>
<otherwise>
<processor-chain>
<logger level="INFO"
message="XML is not Validate in XSD Validater The Control is in Exception Strategy Othere Wish part Of XSLT Applicablity "
doc:name="Logger" />
</processor-chain>
</otherwise>
</choice>
<jms:outbound-endpoint queue="file_out_queue"
connector-ref="File-Processing-JMS-Outbound-connector" doc:name="JMS">
</jms:outbound-endpoint>
<component
class="com.component.ExceptionGeneratorComponent"
doc:name="Exception Generator Component" />
<jms:outbound-endpoint queue="email_queue"
connector-ref="Exception-Email-Processing-JMS-Outbound-connector"
doc:name="JMS">
</jms:outbound-endpoint>
<logger message="Error Occured in File Named : #[header:originalFilename]"
level="ERROR" doc:name="Logger" />
<scripting:transformer doc:name="Add no availability error">
<scripting:script engine="Groovy">
<scripting:text><![CDATA[org.apache.log4j.Logger.getLogger("").error("Error Trace is : \n",exception)]]></scripting:text>
</scripting:script>
</scripting:transformer>
</catch-exception-strategy>
</choice-exception-strategy>
<!-- Globle Choice Exception Strategy -->
<!-- Globle File Endpoints -->
<!--Http Connector Reference For Exclude Session Params Form Http call -->
<http:connector name="NoSessionConnector" doc:name="HTTP\HTTPS">
<service-overrides sessionHandler="org.mule.session.NullSessionHandler" />
</http:connector>
<jms:activemq-xa-connector name="File-Processing-JMS-Outbound-connector"
brokerURL="${jms.connector.brokerurl}" validateConnections="${jms.connector.validateconnections}"
doc:name="Active MQ" persistentDelivery="${jms.connector.persistentdelivery}"
connectionFactory-ref="amqFactory" maxRedelivery="-1">
<dispatcher-threading-profile
maxThreadsActive="${file.processing.jms.connector.maxactivethreads}"
maxBufferSize="${file.processing.jms.connector.maxbuffersize}"
poolExhaustedAction="WAIT" />
<reconnect-forever frequency="${jms.connector.reconnect.forever.frequency}"></reconnect-forever>
</jms:activemq-xa-connector>
<jms:activemq-xa-connector name="File-Processing-JMS-Inbound-connector"
brokerURL="${jms.connector.brokerurl}" validateConnections="${jms.connector.validateconnections}"
doc:name="Active MQ" persistentDelivery="${jms.connector.persistentdelivery}"
connectionFactory-ref="amqFactory" maxRedelivery="-1">
<receiver-threading-profile doThreading="true"
maxThreadsActive="${file.processing.jms.connector.maxactivethreads}"
maxBufferSize="${file.processing.jms.connector.maxbuffersize}"
poolExhaustedAction="WAIT" />
<reconnect-forever frequency="${jms.connector.reconnect.forever.frequency}"></reconnect-forever>
</jms:activemq-xa-connector>
<jms:activemq-xa-connector
name="Exception-Email-Processing-JMS-Outbound-connector" brokerURL="${jms.connector.brokerurl}"
validateConnections="${jms.connector.validateconnections}" doc:name="Active MQ"
persistentDelivery="${jms.connector.persistentdelivery}"
connectionFactory-ref="amqFactory" maxRedelivery="-1">
<dispatcher-threading-profile
maxThreadsActive="${exception.email.processing.jms.connector.maxactivethreads}"
maxBufferSize="${file.processing.jms.connector.maxbuffersize}"
poolExhaustedAction="WAIT" />
<reconnect-forever frequency="${jms.connector.reconnect.forever.frequency}"></reconnect-forever>
</jms:activemq-xa-connector>
<jms:activemq-xa-connector
name="Exception-Email-Processing-JMS-Inbound-connector" brokerURL="${jms.connector.brokerurl}"
validateConnections="${jms.connector.validateconnections}" doc:name="Active MQ"
persistentDelivery="${jms.connector.persistentdelivery}"
connectionFactory-ref="amqFactory" maxRedelivery="-1">
<receiver-threading-profile doThreading="true"
maxThreadsActive="${exception.email.processing.jms.connector.maxactivethreads}"
maxBufferSize="${file.processing.jms.connector.maxbuffersize}"
poolExhaustedAction="WAIT" />
<reconnect-forever frequency="${jms.connector.reconnect.forever.frequency}"></reconnect-forever>
</jms:activemq-xa-connector>
<!--Http Connector Reference For Exclude Session Params Form Http call -->
<file:connector name="inputFileConnector" doc:name="File">
<service-overrides
messageReceiver="com.util.InputFileMessageReceiver" />
</file:connector>
<jbossts:transaction-manager />
<flow name="input_file_to_mule_in_jms" doc:name="input_file_to_mule_in_jms"
processingStrategy="synchronous">
<file:inbound-endpoint path="${symphony.order.input.directory}"
comparator="${symphony.file.comparator}" moveToDirectory="${symphony.order.processed.directory}"
fileAge="${symphony.file.age}" doc:name="File" connector-ref="inputFileConnector"
pollingFrequency="${symphony.inbox.file.polling.frequency}">
<!--<xa-transaction action="ALWAYS_BEGIN"/> -->
</file:inbound-endpoint>
<byte-array-to-object-transformer
doc:name="Byte Array to Object" />
<set-session-variable value="#[header:INBOUND:originalFilename]"
variableName="orderFileName" doc:name="set original file name" />
<logger message="original file name is : #[sessionVars['orderFileName']]"
level="INFO" doc:name="Logger" />
<set-session-variable variableName="originalXML"
value="#[message:payload]" doc:name="Original XML Order" />
<set-variable variableName="actionCommand"
value="#[com.util.SympEnum.NEW_XML]"
doc:name=" set transaction command" />
<component
class="com.component.XmlLoggingComponent"
doc:name="Xml Logging Component" />
<component
class="com.component.XMLSchemaValidator"
doc:name="XML Schema Validator" />
<jms:outbound-endpoint queue="file_in_queue"
connector-ref="File-Processing-JMS-Outbound-connector" doc:name="JMS">
<!--<xa-transaction action="ALWAYS_JOIN"/> -->
</jms:outbound-endpoint>
<exception-strategy ref="Choice_Exception_Strategy"
doc:name="Reference Exception Strategy" />
</flow>
<flow name="improved-project" doc:name="improved-project"
processingStrategy="synchronous">
<composite-source doc:name="Composite Source">
<jms:inbound-endpoint queue="file_in_queue"
connector-ref="File-Processing-JMS-Inbound-connector" doc:name="JMS">
<xa-transaction action="ALWAYS_BEGIN" />
</jms:inbound-endpoint>
<jms:inbound-endpoint queue="exception_queue"
connector-ref="Exception-Email-Processing-JMS-Inbound-connector"
doc:name="JMS">
<xa-transaction action="ALWAYS_BEGIN" />
<!--<xa-transaction action="ALWAYS_BEGIN" timeout="35000"/> -->
</jms:inbound-endpoint>
</composite-source>
<logger message="original file name is : #[sessionVars['orderFileName']]"
level="INFO" doc:name="Logger" />
<!-- <set-session-variable variableName="originalXML" value="#[message:payload]"
doc:name="Original XML Order" /> -->
<custom-transformer
class="com.transformer.XmlToObjectTransformer"
doc:name="xml to object" />
<logger level="INFO"
message="/Message/MessageInfo/MessageSender ID : #[sessionVars['messageObject'].getMessageInfo().getMessageSender()]"
doc:name="Logger" />
<set-session-variable value="#[message:payload]"
variableName="orderPayload" doc:name="Session Variable" />
<set-session-variable variableName="symphHttpEnableDisable"
value="#[${project.http.call.enable}]" doc:name="Http Enable / Disable" />
<choice doc:name="Choice">
<when expression="sessionVars['symphHttpEnableDisable'] == true">
<processor-chain>
<set-session-variable variableName="condition"
value="sessionVars['symphHttpEnableDisable'] == true" doc:name="http Call Enable Disable Condition" />
<set-session-variable variableName="result"
value="true" doc:name="http Call Enable Disable Result" />
<component
class="com.component.CheckPointLoggingComponent"
doc:description="check point logging" doc:name="check point logging" />
<remove-session-variable variableName="condition"
doc:name="Remove Session Variable" />
<remove-session-variable variableName="result"
doc:name="Remove Session Variable" />
<http:outbound-endpoint exchange-pattern="request-response"
host="${project.http.call.host}" port="${project.http.call.port}"
path="${project.http.call.path}=#[sessionVars['messageObject'].getMessageInfo().getMessageSender()]"
method="GET" doc:name="HTTP" connector-ref="NoSessionConnector" />
<byte-array-to-object-transformer
doc:name="Byte Array to Object" />
<logger message="After Http Call : #[message:payload]"
level="INFO" doc:name="Logger" />
<set-session-variable variableName="isPilotCustomer"
value="#[xpath:${project.http.call.tradingpartner.status.xpath}]"
doc:name="Pilot Customer Status" />
<logger
message="XML Response Xpath Value : #[sessionVars['isPilotCustomer']]"
level="INFO" doc:name="Logger" />
<choice doc:name="Choice">
<when expression="sessionVars['isPilotCustomer'] == true">
<processor-chain>
<logger level="INFO" doc:name="Logger"
message="isPilotCustomer check Point pass" />
<set-session-variable variableName="condition"
value="sessionVars['isPilotCustomer'] == true" doc:name="http Call Condition" />
<set-session-variable variableName="result"
value="true" doc:name="http Call Result" />
<component
class="com.component.CheckPointLoggingComponent"
doc:description="check point logging" doc:name="check point logging" />
<remove-session-variable variableName="condition"
doc:name="Remove Session Variable" />
<remove-session-variable variableName="result"
doc:name="Remove Session Variable" />
<flow-ref name="bussnesslogic-project" doc:name="bussnesslogic-project" />
</processor-chain>
</when>
<otherwise>
<processor-chain>
<set-session-variable variableName="condition"
value="sessionVars['isPilotCustomer'] == true" doc:name="http Call Condition" />
<set-session-variable variableName="result"
value="false" doc:name="http Call Result" />
<component
class="com.component.CheckPointLoggingComponent"
doc:description="check point logging" doc:name="check point logging" />
<remove-session-variable variableName="condition"
doc:name="Remove Session Variable" />
<remove-session-variable variableName="result"
doc:name="Remove Session Variable" />
<custom-transformer
class="com.transformer.ObjectToXmlTransformer"
doc:name="Object To Xml" />
<jms:outbound-endpoint queue="file_out_queue"
connector-ref="File-Processing-JMS-Outbound-connector"
doc:name="JMS">
</jms:outbound-endpoint>
</processor-chain>
</otherwise>
</choice>
</processor-chain>
</when>
<otherwise>
<processor-chain>
<logger level="INFO" message="WithOut Http Call Flow Is ...."
doc:name="Logger" />
<set-session-variable variableName="condition"
value="sessionVars['symphHttpEnableDisable'] == true" doc:name="http Call Enable Disable Condition" />
<set-session-variable variableName="result"
value="false" doc:name="http Call Enable Disable Result" />
<component
class="com.component.CheckPointLoggingComponent"
doc:description="check point logging" doc:name="check point logging" />
<remove-session-variable variableName="condition"
doc:name="Remove Session Variable" />
<remove-session-variable variableName="result"
doc:name="Remove Session Variable" />
<flow-ref name="bussnesslogic-project" doc:name="bussnesslogic-project" />
</processor-chain>
</otherwise>
</choice>
<exception-strategy ref="Choice_Exception_Strategy"
doc:name="Reference Exception Strategy" />
</flow>
<sub-flow name="bussnesslogic-project" doc:name="bussnesslogic-project">
<set-payload value="#[sessionVars['orderPayload']]"
doc:name="Set Payload" />
<logger level="INFO" doc:name="Logger"
message="new message payload: #[message:payload]" />
<logger level="INFO" doc:name="Logger"
message="After Set Session Var Object in Payload The PayLoad Value Is : #[message:payload]" />
<component doc:name="RBO project Info Component">
<spring-object bean="projectInfoResourceComponent"></spring-object>
</component>
<logger level="INFO" doc:name="Logger"
message="After Splitting order The PayLoad Value Is : #[message:payload]" />
<component doc:name="RBO project Part Info Component">
<spring-object bean="projectPartInfoComponent"></spring-object>
</component>
<logger level="INFO" doc:name="Logger"
message="After doing DropShip or Available Stock Check, The PayLoad Value Is : #[message:payload]" />
<component
class="com.component.CheckPointLoggingComponent"
doc:description="Check point logging" doc:name="check point logging" />
<component
doc:name="Order Splitting Component After DropShip and Available Stock Check">
<spring-object bean="orderSplittingComponent"></spring-object>
</component>
<logger level="INFO" doc:name="Logger"
message="After Splitting order at last The PayLoad Value Is : #[message:payload]" />
<set-variable variableName="transactionCommand"
value="#[com.util.TransactionType.BEGIN]"
doc:name=" set transaction command" />
<component
class="com.component.TransactionComponent"
doc:name="Begin transaction" />
<component
class="com.component.MessageLoggingComponent"
doc:name="order logging" />
<custom-transformer
class="com.transformer.ObjectToXmlTransformer"
doc:name="Object To Xml Transformer" />
<jms:outbound-endpoint queue="file_out_queue"
connector-ref="File-Processing-JMS-Outbound-connector" doc:name="JMS">
<xa-transaction action="ALWAYS_JOIN" timeout="35000" />
</jms:outbound-endpoint>
<set-variable variableName="transactionCommand"
value="#[com.util.TransactionType.COMMIT]"
doc:name=" set transaction command" />
<component
class="com.component.TransactionComponent"
doc:name="Commit transaction" />
</sub-flow>
<flow name="mule_out_jms_to_output_file" doc:name="mule_out_jms_to_output_file"
processingStrategy="synchronous">
<jms:inbound-endpoint queue="file_out_queue"
connector-ref="File-Processing-JMS-Inbound-connector" doc:name="JMS">
<xa-transaction action="ALWAYS_BEGIN"/>
</jms:inbound-endpoint>
<logger level="INFO" message="activemq_to_fileFlow1 : #[message:payload]"
doc:name="Logger" />
<file:outbound-endpoint path="${project.order.output.directory}"
outputPattern="#[header:SESSION:orderFileName]" responseTimeout="10000"
doc:name="File" />
<set-variable variableName="actionCommand"
value="#[com.util.SympEnum.OUT_XML]"
doc:name=" set transaction command" />
<component
class="com.component.XmlLoggingComponent"
doc:name="Java" />
<exception-strategy ref="Choice_Exception_Strategy"
doc:name="Reference
Exception Strategy" />
</flow>
</mule>
这是我目前的配置。当我停止 mule 应用程序并重新启动它时,它工作正常,它从队列中获取待处理的消息并正确处理它但是当我关闭 ActiveMQ 代理并再次启动它时它不能正常工作,在这种情况下我必须重新启动我的应用程序也仍然没有处理所有文件。