我通过 jms 一起使用 ESB 4.9.0 MB 3.1.0。我需要收到客户的请求,快速回复他,然后尝试付款。如果我需要在发生错误时重复请求或后端服务尚未准备好回答,我使用 jms 队列。但有时队列停止工作。在日志中我看到这个:
TID[-1234] [MB] [2016-05-04 19:40:13,933] ERROR {org.wso2.andes.server.protocol.AMQProtocolEngine} - Unexpected exception while processing frame. Closing connection.
org.wso2.andes.amqp.QpidAndesBridge.rejectMessage(QpidAndesBridge.java:259)
org.wso2.andes.server.handler.BasicRejectMethodHandler.methodReceived(BasicRejectMethodHandler.java:115)
org.wso2.andes.server.handler.ServerMethodDispatcherImpl.dispatchBasicReject(ServerMethodDispatcherImpl.java:263)
org.wso2.andes.framing.amqp_0_91.BasicRejectBodyImpl.execute(BasicRejectBodyImpl.java:126)
org.wso2.andes.server.state.AMQStateManager.methodReceived(AMQStateManager.java:169)
org.wso2.andes.server.protocol.AMQProtocolEngine.methodFrameReceived(AMQProtocolEngine.java:388) org.wso2.andes.framing.AMQMethodBodyImpl.handle(AMQMethodBodyImpl.java:96)
org.wso2.andes.server.protocol.AMQProtocolEngine.frameReceived(AMQProtocolEngine.java:333)
org.wso2.andes.server.protocol.AMQProtocolEngine.dataBlockReceived(AMQProtocolEngine.java:282)
org.wso2.andes.server.protocol.AMQProtocolEngine$1.run(AMQProtocolEngine.java:251) org.wso2.andes.pool.Job.processAll(Job.java:109)
org.wso2.andes.pool.Job.run(Job.java:153) java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) java.lang.Thread.run(Thread.java:745
TID[-1234] [ESB] [2016-05-04 19:40:21,717] ERROR {org.apache.axis2.transport.jms.ServiceTaskManager} - Error committing local session txn for message :
ID:5cb30d8e-c1c4-3044-8f6c-6230ac17f827 org.wso2.andes.client.AMQSession.checkNotClosed(AMQSession.java:655)
org.wso2.andes.client.AMQSession.getTransacted(AMQSession.java:1707)
org.apache.axis2.transport.jms.ServiceTaskManager$MessageListenerTask.handleMessage(ServiceTaskManager.java:593)
org.apache.axis2.transport.jms.ServiceTaskManager$MessageListenerTask.run(ServiceTaskManager.java:468)
org.apache.axis2.transport.base.threads.NativeWorkerPool$1.run(NativeWorkerPool.java:172) java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) java.lang.Thread.run(Thread.java:745)
我不这样做是什么?
这就是我设置 ESB 的方式
<?xml version="1.0" encoding="UTF-8"?>
<proxy name="CyberPlat_Tele2Proxy" startOnLoad="true" trace="disable"
transports="http https" xmlns="http://ws.apache.org/ns/synapse">
<target>
<inSequence>
<property expression="a:xml/rs:data/z:row/@K_ID" name="vId" scope="default" type="STRING" xmlns:a="http://ProxyService"
xmlns:rs="urn:schemas-microsoft-com:rowset" xmlns:z="#RowsetSchema"/>
<property name="vStep" scope="default" type="STRING" value="1"/>
<property name="vCountAttempts" scope="default" type="STRING" value="0"/>
<property name="vIsRepeat" scope="default" type="STRING" value="0"/>
<property expression="//a:xml" name="vPayload" scope="default" type="OM" xmlns:a="http://ProxyService"/>
<!-- ========== The fast answer to client ========== -->
<payloadFactory media-type="xml">
<format>
<AddPaymentResponse xmlns="http://ProxyService">
<AddPaymentResult>
<State>Success</State>
</AddPaymentResult>
</AddPaymentResponse>
</format>
<args/>
</payloadFactory>
<property name="OUT_ONLY" scope="default" type="STRING" value="true"/>
<header action="remove" name="To" scope="default"/>
<property name="RESPONSE" scope="default" type="STRING" value="true"/>
<call/>
<!-- ========================================== -->
<sequence key="ItemQueue"/>
<property name="OUT_ONLY" scope="default" type="STRING" value="false"/>
<sequence key="CyberPlat_Main"/>
</inSequence>
<outSequence/>
<faultSequence>
<sequence key="CyberPlat_Attempts"/>
</faultSequence>
</target>
</proxy>
使用 jms 队列的代理
<?xml version="1.0" encoding="UTF-8"?>
<proxy name="CyberPalt_JMSProxy" startOnLoad="true" trace="disable"
transports="jms" xmlns="http://ws.apache.org/ns/synapse">
<target>
<inSequence>
<sequence key="CyberPlat_Main"/>
</inSequence>
<outSequence/>
<faultSequence>
<log category="ERROR" level="custom">
<property expression="get-property('vId')" name="Id"/>
<property name="Proxy 'CyberPalt_JMSProxy'" value="*********** FAULT ***********"/>
</log>
<sequence key="CyberPlat_Attempts"/>
<header action="remove" name="To" scope="default"/>
<property name="RESPONSE" scope="default" type="STRING" value="true"/>
<payloadFactory media-type="xml">
<format>
<empty/>
</format>
<args/>
</payloadFactory>
<send/>
</faultSequence>
</target>
<parameter name="transport.jms.DestinationType">queue</parameter>
<parameter name="transport.jms.DurableSubscriberName">CyberPalt_JMSProxy</parameter>
<parameter name="transport.jms.Destination">CyberPalt_Queue</parameter>
<parameter name="transport.jms.ConcurrentConsumers">3</parameter>
<parameter name="transport.jms.ContentType">
<rules>
<jmsProperty>contentType</jmsProperty>
<default>text/xml</default>
</rules>
</parameter>
<parameter name="transport.jms.CacheLevel">consumer</parameter>
<parameter name="transport.jms.MaxConcurrentConsumers">3</parameter>
</proxy>
这是我的主要序列
<?xml version="1.0" encoding="UTF-8"?>
<sequence name="CyberPlat_Main" trace="disable" xmlns="http://ws.apache.org/ns/synapse">
<!-- ======================== -->
<property name="NO_KEEPALIVE" scope="axis2" type="STRING" value="true"/>
<!-- ========== Id ========== -->
<property expression="//q:itemQueue/q:id" name="vId"
scope="default" type="STRING" xmlns:q="http://QueueService"/>
<log level="custom">
<property expression="get-property('vId')" name="Id"/>
<property name="Sequence 'CyberPlat_Main'" value="*********** BEGIN CYBERPALT***********"/>
</log>
<!-- ========== Step ========== -->
<property expression="//q:itemQueue/q:step" name="vStep"
scope="default" type="STRING" xmlns:q="http://QueueService"/>
<!-- ========== CountAttempts ========== -->
<property expression="number(get-property('//q:itemQueue/q:countAttempts'))+1"
name="vCountAttempts" scope="default" type="STRING" xmlns:q="http://QueueService"/>
<!-- ========== Payload ========== -->
<property expression="//p:payload" name="vPayload" scope="default"
type="OM" xmlns:p="http://PayloadService"/>
<!-- ========== MaxAttempts ========== -->
<property name="vMaxAttempts" scope="default" type="STRING" value="3"/>
<!-- ========== IsRepeat ========== -->
<property expression="//q:itemQueue/q:isRepeat" name="vIsRepeat"
scope="default" type="STRING" xmlns:q="http://QueueService"/>
<!-- Step1 -->
<filter regex="1" source="$ctx:vStep">
<then>
<sequence key="CreatePayment"/>
<property name="vStep" scope="default" type="STRING" value="2"/>
</then>
<else/>
</filter>
<!-- Step2 -->
<filter regex="2" source="$ctx:vStep">
<then>
<sequence key="ConfirmPayment"/>
<!-- ======== Pay check result ======== -->
<property expression="//a:PayResponse/a:CpStatus/text()" name="vStatus" scope="default" type="STRING" xmlns:a="http://CyberPlatService"/>
<filter regex="RepeatStatus" source="$ctx:vStatus">
<then>
<!-- Need repeat request a status of payment -->
<property name="vIsRepeat" scope="default" type="STRING" value="1"/>
<sequence key="ItemQueue"/>
<property name="OUT_ONLY" scope="default" type="STRING" value="true"/>
<call>
<endpoint>
<address trace="disable" uri="jms:/CyberPalt_Queue?transport.jms.ConnectionFactoryJNDIName=QueueConnectionFactory&java.naming.factory.initial=org.wso2.andes.jndi.PropertiesFileInitialContextFactory&java.naming.provider.url=repository/conf/jndi.properties&transport.jms.DestinationType=queue"/>
</endpoint>
</call>
</then>
<else>
<log level="custom">
<property expression="get-property('vId')" name="Id"/>
<property name="Sequence 'CyberPlat_Main'" value="*********** Payment is successful! ***********"/>
</log>
<property name="vOnlineStatus" scope="default" type="STRING" value="0"/>
<property name="vErrorCode" scope="default" type="STRING" value="0"/>
<property name="vErrorText" scope="default" type="STRING" value="OK"/>
<property name="vStep" scope="default" type="STRING" value="3"/>
</else>
</filter>
</then>
<else/>
</filter>
<!-- Step3 End -->
<filter regex="3" source="$ctx:vStep">
<then>
<sequence key="ChangeStatus_ItemQueue"/>
<sequence key="ChangeStatus_Payment"/>
</then>
<else/>
</filter>
<log level="custom">
<property expression="get-property('vId')" name="Id"/>
<property name="Sequence 'CyberPlat_Main'" value="*********** END CYBERPALT ***********"/>
</log>
</sequence>
这是尝试次数的顺序
<?xml version="1.0" encoding="UTF-8"?>
<sequence name="CyberPlat_Attempts" trace="disable" xmlns="http://ws.apache.org/ns/synapse">
<log level="custom">
<property expression="get-property('vId')" name="Id"/>
<property expression="get-property('vMaxAttempts')" name="MaxAttempts"/>
<property expression="get-property('vCountAttempts')" name="CountAttempts"/>
</log>
<filter xpath="number($ctx:vMaxAttempts) >= number($ctx:vCountAttempts)">
<then>
<log category="ERROR" level="custom">
<property expression="get-property('vId')" name="Id"/>
<property expression="get-property('vCountAttempts')" name="CountAttempts"/>
<property expression="$ctx:vMessage" name="Message"/>
</log>
<property name="vIsRepeat" scope="default" type="STRING" value="1"/>
<sequence key="ItemQueue"/>
<property name="OUT_ONLY" scope="default" type="STRING" value="true"/>
<call>
<endpoint>
<address trace="disable" uri="jms:/CyberPalt_Queue?transport.jms.ConnectionFactoryJNDIName=QueueConnectionFactory&java.naming.factory.initial=org.wso2.andes.jndi.PropertiesFileInitialContextFactory&java.naming.provider.url=repository/conf/jndi.properties&transport.jms.DestinationType=queue"/>
</endpoint>
</call>
</then>
<else>
<property name="vOnlineStatus" scope="default" type="STRING" value="3"/>
<property name="vErrorCode" scope="default" type="STRING" value="3"/>
<property name="vErrorText" scope="default" type="STRING" expression="get-property('ERROR_MESSAGE')"/>
<sequence key="ChangeStatus_ItemQueue"/>
<sequence key="ChangeStatus_Payment"/>
</else>
</filter>
</sequence>
对有效负载进行排序
<?xml version="1.0" encoding="UTF-8"?>
<sequence name="ItemQueue" trace="disable" xmlns="http://ws.apache.org/ns/synapse">
<payloadFactory media-type="xml">
<format>
<itemQueue xmlns="http://QueueService">
<step>$1</step>
<id>$2</id>
<countAttempts>$3</countAttempts>
<isRepeat>$4</isRepeat>
$5
</itemQueue>
</format>
<args>
<arg evaluator="xml" expression="get-property('vStep')"/>
<arg evaluator="xml" expression="get-property('vId')"/>
<arg evaluator="xml" expression="get-property('vCountAttempts')"/>
<arg evaluator="xml" expression="get-property('vIsRepeat')"/>
<arg evaluator="xml" expression="$ctx:vPayload"/>
</args>
</payloadFactory>
</sequence>
我的 jms 传输设置
<parameter locked="false" name="myQueueConnectionFactory">
<parameter locked="false" name="java.naming.factory.initial">org.wso2.andes.jndi.PropertiesFileInitialContextFactory</parameter>
<parameter locked="false" name="java.naming.provider.url">repository/conf/jndi.properties</parameter>
<parameter locked="false" name="transport.jms.ConnectionFactoryJNDIName">QueueConnectionFactory</parameter>
<parameter locked="false" name="transport.jms.ConnectionFactoryType">queue</parameter>
<parameter name="transport.jms.SessionTransacted">true</parameter>
<parameter locked="true" name="transport.jms.SessionAcknowledgement">CLIENT_ACKNOWLEDGE</parameter>
<parameter locked="true" name="transport.jms.ConcurrentConsumers">100</parameter>
<parameter locked="true" name="transport.jms.MaxConcurrentConsumers">100</parameter>
</parameter>