我在 Red Hat 2.6.32 上运行 WSO2 ESB 4.5.1 和 Sun Java SE 1.6.0_33,使用 Apache ActiveMQ 5.5.1 作为 ESB 的持久存储。
我正在尝试编写一个流程:
1. 从 XML 文件中读取地址信息
2. 将每个地址保存在 MQ 队列中
3. 从 MQ 读取消息处理器并尝试传递到 JMS 端点
4. 如果传递失败消息处理器将在停用自身之前再尝试一次交付
当 JMS 端点(在另一台服务器上运行的另一个 ActiveMQ 实例)启动时,一切正常,但是如果我停止充当 JMS 端点的 ActiveMQ,消息处理器不会像我预期的那样在重试后停用,而是不断尝试重新发送来自消息存储的相同消息。出了什么问题?
这是我的流程:
代理人:
<proxy xmlns="http://ws.apache.org/ns/synapse" name="AddressPxy" transports="vfs" statistics="disable" trace="disable" startOnLoad="true">
<target>
<inSequence>
<property name="OUT_ONLY" value="true" scope="default" type="STRING"/>
<property name="target.endpoint" value="AddressesEP" scope="default" type="STRING"/>
<log level="full"/>
<iterate expression="//addresses/address">
<target>
<sequence>
<store messageStore="AddressesMS"/>
</sequence>
</target>
</iterate>
</inSequence>
</target>
<parameter name="transport.vfs.ActionAfterProcess">MOVE</parameter>
<parameter name="transport.PollInterval">15</parameter>
<parameter name="transport.vfs.MoveAfterProcess">file:///home/esb/sent</parameter>
<parameter name="transport.vfs.FileURI">file:///home/esb/addresses.xml</parameter>
<parameter name="transport.vfs.MoveAfterFailure">file:///home/esb/fail</parameter>
<parameter name="transport.vfs.ContentType">application/xml</parameter>
<parameter name="transport.vfs.ActionAfterFailure">MOVE</parameter>
<description></description>
</proxy>
消息存储:
<messageStore name="AddressesMS" class="org.wso2.carbon.message.store.persistence.jms.JMSMessageStore" xmlns="http://ws.apache.org/ns/synapse">
<parameter name="java.naming.factory.initial">org.apache.activemq.jndi.ActiveMQInitialContextFactory</parameter>
<parameter name="java.naming.provider.url">tcp://localhost:61616</parameter>
<parameter name="store.jms.JMSSpecVersion">1.1</parameter>
<parameter name="store.jms.cache.connection">false</parameter>
</messageStore>
消息处理器:
<messageProcessor name="AddressesMP" class="org.apache.synapse.message.processors.forward.ScheduledMessageForwardingProcessor" messageStore="AddressesMS" xmlns="http://ws.apache.org/ns/synapse">
<parameter name="interval">10000</parameter>
<parameter name="max.delivery.attempts">1</parameter>
</messageProcessor>
端点:
<endpoint xmlns="http://ws.apache.org/ns/synapse" name="AddressesEP">
<address uri="jms:/Addresses?transport.jms.ConnectionFactoryJNDIName=QueueConnectionFactory&java.naming.factory.initial=org.apache.activemq.jndi.ActiveMQInitialContextFactory&java.naming.provider.url=tcp://rhdev001:61616&transport.jms.DestinationType=queue">
</address>
</endpoint>
我在 wso2carbon.log 中看到此错误消息:
TID: [0] [ESB] [2012-11-07 02:58:07,396] ERROR {org.apache.axis2.transport.jms.JMSSender} - Unable to create a JMSMessageSender for : null {org.apache.axis2.transport.jms.JMSSender}
javax.jms.JMSException: Could not connect to broker URL: tcp://rhdev001:61616. Reason: java.net.ConnectException: Connection refused
TID: [0] [ESB] [2012-11-07 02:58:07,398] ERROR {org.apache.synapse.message.processors.forward.BlockingMessageSender} - Error sending Message to url : jms:/Addresses?transport.jms.ConnectionFactoryJNDIName=QueueConnectionFactory&java.naming.factory.initial=org.apache.activemq.jndi.ActiveMQInitialContextFactory&java.naming.provider.url=tcp://rhdev001:61616&transport.jms.DestinationType=queue {org.apache.synapse.message.processors.forward.BlockingMessageSender}
org.apache.axis2.AxisFault: Unable to create a JMSMessageSender for : null
TID: [0] [ESB] [2012-11-07 02:58:07,399] ERROR {org.apache.synapse.message.processors.forward.ForwardingJob} - Error Forwarding Message {org.apache.synapse.message.processors.forward.ForwardingJob}
java.lang.Exception: Error while Sending Message
希望可以有人帮帮我!