2

我在 Red Hat 2.6.32 上运行 WSO2 ESB 4.5.1 和 Sun Java SE 1.6.0_33,使用 Apache ActiveMQ 5.5.1 作为 ESB 的持久存储。

我正在尝试编写一个流程:
1. 从 XML 文件中读取地址信息
2. 将每个地址保存在 MQ 队列中
3. 从 MQ 读取消息处理器并尝试传递到 JMS 端点
4. 如果传递失败消息处理器将在停用自身之前再尝试一次交付

当 JMS 端点(在另一台服务器上运行的另一个 ActiveMQ 实例)启动时,一切正常,但是如果我停止充当 JMS 端点的 ActiveMQ,消息处理器不会像我预期的那样在重试后停用,而是不断尝试重新发送来自消息存储的相同消息。出了什么问题?

这是我的流程:

代理人:

<proxy xmlns="http://ws.apache.org/ns/synapse" name="AddressPxy" transports="vfs" statistics="disable" trace="disable" startOnLoad="true">
<target>
  <inSequence>
     <property name="OUT_ONLY" value="true" scope="default" type="STRING"/>
     <property name="target.endpoint" value="AddressesEP" scope="default" type="STRING"/>
     <log level="full"/>
     <iterate expression="//addresses/address">
        <target>
           <sequence>
              <store messageStore="AddressesMS"/>
           </sequence>
        </target>
      </iterate>
    </inSequence>
  </target>
  <parameter name="transport.vfs.ActionAfterProcess">MOVE</parameter>
  <parameter name="transport.PollInterval">15</parameter>
  <parameter name="transport.vfs.MoveAfterProcess">file:///home/esb/sent</parameter>
  <parameter name="transport.vfs.FileURI">file:///home/esb/addresses.xml</parameter>
  <parameter name="transport.vfs.MoveAfterFailure">file:///home/esb/fail</parameter>
  <parameter name="transport.vfs.ContentType">application/xml</parameter>
  <parameter name="transport.vfs.ActionAfterFailure">MOVE</parameter>
  <description></description>
</proxy>

消息存储:

<messageStore name="AddressesMS" class="org.wso2.carbon.message.store.persistence.jms.JMSMessageStore" xmlns="http://ws.apache.org/ns/synapse">
  <parameter name="java.naming.factory.initial">org.apache.activemq.jndi.ActiveMQInitialContextFactory</parameter>
  <parameter name="java.naming.provider.url">tcp://localhost:61616</parameter>
  <parameter name="store.jms.JMSSpecVersion">1.1</parameter>
  <parameter name="store.jms.cache.connection">false</parameter>
</messageStore> 

消息处理器:

<messageProcessor name="AddressesMP" class="org.apache.synapse.message.processors.forward.ScheduledMessageForwardingProcessor" messageStore="AddressesMS" xmlns="http://ws.apache.org/ns/synapse">
  <parameter name="interval">10000</parameter>
  <parameter name="max.delivery.attempts">1</parameter>
</messageProcessor> 

端点:

<endpoint xmlns="http://ws.apache.org/ns/synapse" name="AddressesEP">
  <address uri="jms:/Addresses?transport.jms.ConnectionFactoryJNDIName=QueueConnectionFactory&java.naming.factory.initial=org.apache.activemq.jndi.ActiveMQInitialContextFactory&java.naming.provider.url=tcp://rhdev001:61616&transport.jms.DestinationType=queue">
  </address>
</endpoint>

我在 wso2carbon.log 中看到此错误消息:

TID: [0] [ESB] [2012-11-07 02:58:07,396] ERROR {org.apache.axis2.transport.jms.JMSSender} -  Unable to create a JMSMessageSender for : null {org.apache.axis2.transport.jms.JMSSender}  
javax.jms.JMSException: Could not connect to broker URL: tcp://rhdev001:61616. Reason: java.net.ConnectException: Connection refused  
TID: [0] [ESB] [2012-11-07 02:58:07,398] ERROR {org.apache.synapse.message.processors.forward.BlockingMessageSender} -  Error sending Message to url : jms:/Addresses?transport.jms.ConnectionFactoryJNDIName=QueueConnectionFactory&java.naming.factory.initial=org.apache.activemq.jndi.ActiveMQInitialContextFactory&java.naming.provider.url=tcp://rhdev001:61616&transport.jms.DestinationType=queue {org.apache.synapse.message.processors.forward.BlockingMessageSender}  
org.apache.axis2.AxisFault: Unable to create a JMSMessageSender for : null  
TID: [0] [ESB] [2012-11-07 02:58:07,399] ERROR {org.apache.synapse.message.processors.forward.ForwardingJob} -  Error Forwarding Message  {org.apache.synapse.message.processors.forward.ForwardingJob}  
java.lang.Exception: Error while Sending Message

希望可以有人帮帮我!

4

1 回答 1

0

在此版本的 ESB 中,消息处理器中的“max.delivery.attempts”参数存在一个小问题。但是您可以通过转到源视图并将“max.delivery.attempts”更改为“max.deliver.attempts”来完成这项工作。

于 2012-11-22T05:46:10.253 回答