1

我通过 jms 一起使用 ESB 4.9.0 MB 3.1.0。我需要收到客户的请求,快速回复他,然后尝试付款。如果我需要在发生错误时重复请求或后端服务尚未准备好回答,我使用 jms 队列。但有时队列停止工作。在日志中我看到这个:

TID[-1234] [MB] [2016-05-04 19:40:13,933] ERROR {org.wso2.andes.server.protocol.AMQProtocolEngine} - Unexpected exception while processing frame. Closing connection.
 org.wso2.andes.amqp.QpidAndesBridge.rejectMessage(QpidAndesBridge.java:259)
 org.wso2.andes.server.handler.BasicRejectMethodHandler.methodReceived(BasicRejectMethodHandler.java:115)
 org.wso2.andes.server.handler.ServerMethodDispatcherImpl.dispatchBasicReject(ServerMethodDispatcherImpl.java:263)
 org.wso2.andes.framing.amqp_0_91.BasicRejectBodyImpl.execute(BasicRejectBodyImpl.java:126)
 org.wso2.andes.server.state.AMQStateManager.methodReceived(AMQStateManager.java:169)
 org.wso2.andes.server.protocol.AMQProtocolEngine.methodFrameReceived(AMQProtocolEngine.java:388) org.wso2.andes.framing.AMQMethodBodyImpl.handle(AMQMethodBodyImpl.java:96)
 org.wso2.andes.server.protocol.AMQProtocolEngine.frameReceived(AMQProtocolEngine.java:333)
 org.wso2.andes.server.protocol.AMQProtocolEngine.dataBlockReceived(AMQProtocolEngine.java:282)
 org.wso2.andes.server.protocol.AMQProtocolEngine$1.run(AMQProtocolEngine.java:251) org.wso2.andes.pool.Job.processAll(Job.java:109)
 org.wso2.andes.pool.Job.run(Job.java:153) java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
 java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) java.lang.Thread.run(Thread.java:745

TID[-1234] [ESB] [2016-05-04 19:40:21,717] ERROR {org.apache.axis2.transport.jms.ServiceTaskManager} - Error committing local session txn for message :
 ID:5cb30d8e-c1c4-3044-8f6c-6230ac17f827 org.wso2.andes.client.AMQSession.checkNotClosed(AMQSession.java:655)
 org.wso2.andes.client.AMQSession.getTransacted(AMQSession.java:1707)
 org.apache.axis2.transport.jms.ServiceTaskManager$MessageListenerTask.handleMessage(ServiceTaskManager.java:593)
 org.apache.axis2.transport.jms.ServiceTaskManager$MessageListenerTask.run(ServiceTaskManager.java:468)
 org.apache.axis2.transport.base.threads.NativeWorkerPool$1.run(NativeWorkerPool.java:172) java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
 java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) java.lang.Thread.run(Thread.java:745)

我不这样做是什么?

这就是我设置 ESB 的方式

<?xml version="1.0" encoding="UTF-8"?>
<proxy name="CyberPlat_Tele2Proxy" startOnLoad="true" trace="disable"
  transports="http https" xmlns="http://ws.apache.org/ns/synapse">
  <target>
    <inSequence>
      <property expression="a:xml/rs:data/z:row/@K_ID" name="vId" scope="default" type="STRING" xmlns:a="http://ProxyService"
        xmlns:rs="urn:schemas-microsoft-com:rowset" xmlns:z="#RowsetSchema"/>
      <property name="vStep" scope="default" type="STRING" value="1"/>
      <property name="vCountAttempts" scope="default" type="STRING" value="0"/>
      <property name="vIsRepeat" scope="default" type="STRING" value="0"/>
      <property expression="//a:xml" name="vPayload" scope="default" type="OM" xmlns:a="http://ProxyService"/>
      <!-- ========== The fast answer to client ========== -->
      <payloadFactory media-type="xml">
        <format>
          <AddPaymentResponse xmlns="http://ProxyService">
            <AddPaymentResult>
              <State>Success</State>
            </AddPaymentResult>
          </AddPaymentResponse>
        </format>
        <args/>
      </payloadFactory>
      <property name="OUT_ONLY" scope="default" type="STRING" value="true"/>
      <header action="remove" name="To" scope="default"/>
      <property name="RESPONSE" scope="default" type="STRING" value="true"/>
      <call/>
      <!-- ========================================== -->
      <sequence key="ItemQueue"/>
      <property name="OUT_ONLY" scope="default" type="STRING" value="false"/>
      <sequence key="CyberPlat_Main"/>
    </inSequence>
    <outSequence/>
    <faultSequence>
      <sequence key="CyberPlat_Attempts"/>
    </faultSequence>
  </target>
</proxy>

使用 jms 队列的代理

<?xml version="1.0" encoding="UTF-8"?>
<proxy name="CyberPalt_JMSProxy" startOnLoad="true" trace="disable"
  transports="jms" xmlns="http://ws.apache.org/ns/synapse">
  <target>
    <inSequence>
      <sequence key="CyberPlat_Main"/>
    </inSequence>
    <outSequence/>
    <faultSequence>
      <log category="ERROR" level="custom">
        <property expression="get-property('vId')" name="Id"/>
        <property name="Proxy 'CyberPalt_JMSProxy'" value="*********** FAULT ***********"/>
      </log>
      <sequence key="CyberPlat_Attempts"/>
      <header action="remove" name="To" scope="default"/>
      <property name="RESPONSE" scope="default" type="STRING" value="true"/>
      <payloadFactory media-type="xml">
        <format>
          <empty/>
        </format>
        <args/>
      </payloadFactory>
      <send/>
    </faultSequence>
  </target>
  <parameter name="transport.jms.DestinationType">queue</parameter>
  <parameter name="transport.jms.DurableSubscriberName">CyberPalt_JMSProxy</parameter>
  <parameter name="transport.jms.Destination">CyberPalt_Queue</parameter>
  <parameter name="transport.jms.ConcurrentConsumers">3</parameter>
  <parameter name="transport.jms.ContentType">
    <rules>
      <jmsProperty>contentType</jmsProperty>
      <default>text/xml</default>
    </rules>
  </parameter>
  <parameter name="transport.jms.CacheLevel">consumer</parameter>
  <parameter name="transport.jms.MaxConcurrentConsumers">3</parameter>
</proxy>

这是我的主要序列

<?xml version="1.0" encoding="UTF-8"?>
<sequence name="CyberPlat_Main" trace="disable" xmlns="http://ws.apache.org/ns/synapse">
  <!-- ======================== -->
  <property name="NO_KEEPALIVE" scope="axis2" type="STRING" value="true"/>
  <!-- ========== Id ========== -->
  <property expression="//q:itemQueue/q:id" name="vId"
    scope="default" type="STRING" xmlns:q="http://QueueService"/>
  <log level="custom">
    <property expression="get-property('vId')" name="Id"/>
    <property name="Sequence 'CyberPlat_Main'" value="*********** BEGIN CYBERPALT***********"/>
  </log>
  <!-- ========== Step ========== -->
  <property expression="//q:itemQueue/q:step" name="vStep"
    scope="default" type="STRING" xmlns:q="http://QueueService"/>
  <!-- ========== CountAttempts ========== -->
  <property expression="number(get-property('//q:itemQueue/q:countAttempts'))+1"
    name="vCountAttempts" scope="default" type="STRING" xmlns:q="http://QueueService"/>
  <!-- ========== Payload ========== -->
  <property expression="//p:payload" name="vPayload" scope="default"
    type="OM" xmlns:p="http://PayloadService"/>
  <!-- ========== MaxAttempts ========== -->
  <property name="vMaxAttempts" scope="default" type="STRING" value="3"/>
  <!-- ========== IsRepeat ========== -->
  <property expression="//q:itemQueue/q:isRepeat" name="vIsRepeat"
    scope="default" type="STRING" xmlns:q="http://QueueService"/>
  <!-- Step1 -->
  <filter regex="1" source="$ctx:vStep">
    <then>
      <sequence key="CreatePayment"/>
      <property name="vStep" scope="default" type="STRING" value="2"/>
    </then>
    <else/>
  </filter>
  <!-- Step2 -->
  <filter regex="2" source="$ctx:vStep">
    <then>
      <sequence key="ConfirmPayment"/>
      <!-- ======== Pay check result ========  -->
      <property expression="//a:PayResponse/a:CpStatus/text()" name="vStatus" scope="default" type="STRING" xmlns:a="http://CyberPlatService"/>
      <filter regex="RepeatStatus" source="$ctx:vStatus">
        <then>
          <!-- Need repeat request a status of payment -->
          <property name="vIsRepeat" scope="default" type="STRING" value="1"/>
          <sequence key="ItemQueue"/>
          <property name="OUT_ONLY" scope="default" type="STRING" value="true"/>
          <call>
            <endpoint>
              <address trace="disable" uri="jms:/CyberPalt_Queue?transport.jms.ConnectionFactoryJNDIName=QueueConnectionFactory&amp;java.naming.factory.initial=org.wso2.andes.jndi.PropertiesFileInitialContextFactory&amp;java.naming.provider.url=repository/conf/jndi.properties&amp;transport.jms.DestinationType=queue"/>
            </endpoint>
          </call>
        </then>
        <else>
          <log  level="custom">
            <property expression="get-property('vId')" name="Id"/>
            <property name="Sequence 'CyberPlat_Main'" value="*********** Payment is successful! ***********"/>
          </log>
          <property name="vOnlineStatus" scope="default" type="STRING" value="0"/>
          <property name="vErrorCode" scope="default" type="STRING" value="0"/>
          <property name="vErrorText" scope="default" type="STRING" value="OK"/>
          <property name="vStep" scope="default" type="STRING" value="3"/>
        </else>
      </filter>
    </then>
    <else/>
  </filter>
  <!-- Step3 End -->
  <filter regex="3" source="$ctx:vStep">
    <then>
      <sequence key="ChangeStatus_ItemQueue"/>
      <sequence key="ChangeStatus_Payment"/>
    </then>
    <else/>
  </filter>
  <log level="custom">
    <property expression="get-property('vId')" name="Id"/>
    <property name="Sequence 'CyberPlat_Main'" value="*********** END CYBERPALT ***********"/>
  </log>
</sequence>

这是尝试次数的顺序

<?xml version="1.0" encoding="UTF-8"?>
<sequence name="CyberPlat_Attempts" trace="disable" xmlns="http://ws.apache.org/ns/synapse">
  <log level="custom">
    <property expression="get-property('vId')" name="Id"/>
    <property expression="get-property('vMaxAttempts')" name="MaxAttempts"/>
    <property expression="get-property('vCountAttempts')" name="CountAttempts"/>
  </log>
  <filter xpath="number($ctx:vMaxAttempts) >= number($ctx:vCountAttempts)">
    <then>
      <log category="ERROR" level="custom">
        <property expression="get-property('vId')" name="Id"/>
        <property expression="get-property('vCountAttempts')" name="CountAttempts"/>
        <property expression="$ctx:vMessage" name="Message"/>
      </log>
      <property name="vIsRepeat" scope="default" type="STRING" value="1"/>
      <sequence key="ItemQueue"/>
      <property name="OUT_ONLY" scope="default" type="STRING" value="true"/>
      <call>
        <endpoint>
          <address trace="disable" uri="jms:/CyberPalt_Queue?transport.jms.ConnectionFactoryJNDIName=QueueConnectionFactory&amp;java.naming.factory.initial=org.wso2.andes.jndi.PropertiesFileInitialContextFactory&amp;java.naming.provider.url=repository/conf/jndi.properties&amp;transport.jms.DestinationType=queue"/>
        </endpoint>
      </call>
    </then>
    <else>
      <property name="vOnlineStatus" scope="default" type="STRING" value="3"/>
      <property name="vErrorCode" scope="default" type="STRING" value="3"/>
      <property name="vErrorText" scope="default" type="STRING" expression="get-property('ERROR_MESSAGE')"/>
      <sequence key="ChangeStatus_ItemQueue"/>
      <sequence key="ChangeStatus_Payment"/>
    </else>
  </filter>
</sequence>

对有效负载进行排序

<?xml version="1.0" encoding="UTF-8"?>
<sequence name="ItemQueue" trace="disable" xmlns="http://ws.apache.org/ns/synapse">
  <payloadFactory media-type="xml">
    <format>
      <itemQueue xmlns="http://QueueService">
        <step>$1</step>
        <id>$2</id>
        <countAttempts>$3</countAttempts>
        <isRepeat>$4</isRepeat>
        $5
      </itemQueue>
    </format>
    <args>
      <arg evaluator="xml" expression="get-property('vStep')"/>
      <arg evaluator="xml" expression="get-property('vId')"/>
      <arg evaluator="xml" expression="get-property('vCountAttempts')"/>
      <arg evaluator="xml" expression="get-property('vIsRepeat')"/>
      <arg evaluator="xml" expression="$ctx:vPayload"/>
    </args>
  </payloadFactory>
</sequence>

我的 jms 传输设置

<parameter locked="false" name="myQueueConnectionFactory">
  <parameter locked="false" name="java.naming.factory.initial">org.wso2.andes.jndi.PropertiesFileInitialContextFactory</parameter>
  <parameter locked="false" name="java.naming.provider.url">repository/conf/jndi.properties</parameter>
  <parameter locked="false" name="transport.jms.ConnectionFactoryJNDIName">QueueConnectionFactory</parameter>
  <parameter locked="false" name="transport.jms.ConnectionFactoryType">queue</parameter>
  <parameter name="transport.jms.SessionTransacted">true</parameter>
  <parameter locked="true" name="transport.jms.SessionAcknowledgement">CLIENT_ACKNOWLEDGE</parameter>
  <parameter locked="true" name="transport.jms.ConcurrentConsumers">100</parameter>
  <parameter locked="true" name="transport.jms.MaxConcurrentConsumers">100</parameter>
</parameter>
4

0 回答 0