0

尝试在主题消费者上实现持久功能。为 jms 消费者和 clientID 命名。(显然添加了durable="true")

现在据我所读。该主题将在第一次运行时将消费者注册为“持久”。

所以基本上我这样做了,部署了生产者和消费者。它被注册为耐用消费者。向主题发布消息,消费者得到它。现在我取消部署消费者并发布消费者在起床时应该收到的另一条消息。当我再次部署消费者时,我得到常见的 temp-topic://XXXXXXXXXXXX 目的地不存在。

为什么会这样?我不应该收到“丢失”的消息吗?

这是我当前为发布者配置的 jms activemq 连接器配置:

<?xml version="1.0" encoding="UTF-8"?>

<mule xmlns:jbossts="http://www.mulesoft.org/schema/mule/jbossts" xmlns="http://www.mulesoft.org/schema/mule/core" xmlns:http="http://www.mulesoft.org/schema/mule/http" xmlns:jms="http://www.mulesoft.org/schema/mule/jms" xmlns:doc="http://www.mulesoft.org/schema/mule/documentation" xmlns:spring="http://www.springframework.org/schema/beans" xmlns:core="http://www.mulesoft.org/schema/mule/core" version="CE-3.3.1" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="
http://www.mulesoft.org/schema/mule/http http://www.mulesoft.org/schema/mule/http/current/mule-http.xsd 
http://www.mulesoft.org/schema/mule/jms http://www.mulesoft.org/schema/mule/jms/current/mule-jms.xsd 
http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-current.xsd 
http://www.mulesoft.org/schema/mule/core http://www.mulesoft.org/schema/mule/core/current/mule.xsd 
http://www.mulesoft.org/schema/mule/jbossts http://www.mulesoft.org/schema/mule/jbossts/current/mule-jbossts.xsd ">
    <jms:activemq-connector name="Active_MQ" specification="1.1" brokerURL="tcp://localhost:61616" validateConnections="false"  doc:name="Active MQ" maxRedelivery="1" persistentDelivery="true" durable="true" clientId="RoutingTopic">
        <reconnect count="5"  />
    </jms:activemq-connector>

    <message-properties-transformer name="MessagePropertiesTransformer" doc:name="Message Properties" overwrite="true">
        <add-message-property key="BACKEND_SUBSCRIBER" value="#[flowVars['backend']]"/>
        <add-message-property key="MULE_EVENT_TIMEOUT" value="60000"/>
    </message-properties-transformer>

    <flow name="jmsFlow1" doc:name="jmsFlow1">
        <http:inbound-endpoint exchange-pattern="request-response" host="localhost" port="8081" path="jms" doc:name="HTTP"/>
        <set-variable variableName="#['id']" value="#[message.inboundProperties['id']]" doc:name="set dynamic id"/>
        <set-variable variableName="#['backend']" value="#[message.inboundProperties['backend']]" doc:name="setting backend"/>
        <set-payload value="#['This is a message test for id '] #[flowVars['id']]" doc:name="set random string as payload"/>
        <choice doc:name="Choice">
            <when expression="#[true]">
                <processor-chain>                
                    <jms:outbound-endpoint exchange-pattern="request-response"  connector-ref="Active_MQ" doc:name="JMS Topic Requestor" transformer-refs="MessagePropertiesTransformer" topic="ESB.Topic">
                    </jms:outbound-endpoint>
                </processor-chain>
            </when>
            <otherwise>
                <processor-chain>
                    <logger message="This is the default case" level="INFO" doc:name="Logger"/>
                </processor-chain>
            </otherwise>
        </choice>
    </flow>
</mule>

这是消费者之一,我有2个,但两者基本上是一样的

<?xml version="1.0" encoding="UTF-8"?>

<mule xmlns:jbossts="http://www.mulesoft.org/schema/mule/jbossts" xmlns="http://www.mulesoft.org/schema/mule/core" xmlns:jms="http://www.mulesoft.org/schema/mule/jms" xmlns:doc="http://www.mulesoft.org/schema/mule/documentation" xmlns:spring="http://www.springframework.org/schema/beans" xmlns:core="http://www.mulesoft.org/schema/mule/core" version="CE-3.3.1" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="
http://www.mulesoft.org/schema/mule/jms http://www.mulesoft.org/schema/mule/jms/current/mule-jms.xsd 
http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-current.xsd 
http://www.mulesoft.org/schema/mule/core http://www.mulesoft.org/schema/mule/core/current/mule.xsd 
http://www.mulesoft.org/schema/mule/jbossts http://www.mulesoft.org/schema/mule/jbossts/current/mule-jbossts.xsd ">
    <jms:activemq-connector name="UpCity_Connector" specification="1.1" brokerURL="tcp://localhost:61616" validateConnections="false" maxRedelivery="0" doc:name="Active MQ" clientId="RandomName" durable="true"/>
    <flow name="jmsAdapterConsumerFlow1" doc:name="fmsAdapterConsumerFlow1">
        <jms:inbound-endpoint exchange-pattern="request-response"  connector-ref="UpCity_Connector" doc:name="JMS Replier Consumer" topic="ESB.Topic">
            <jms:selector expression="BACKEND_SUBSCRIBER='randombackend'"/>
        </jms:inbound-endpoint>
        <set-payload value="#[payload + ' returned from a random backend']" doc:name="Add string to payload"/>
        <logger message="#[payload]" level="INFO" doc:name="Logger"/>
    </flow>
</mule>

谢谢。

4

1 回答 1

1

持久消息在 JMS 中有点棘手,因为您的配置/代码必须满足几个标准才能使其正常工作:

  • 在您的代理配置中启用持久性(在 ActiveMQ 中,这可以在 David 发布的连接字符串中完成,或者在您的配置 xml 中完成
  • 创建一个持久订阅者(ActiveMQ API

我不确定 mule 是否会创建持久订阅者。但是,您可以在ActiveMQ 的 Web 控制台中进行检查。在那里,您可以获得当前持久订阅的列表。

于 2013-08-16T07:47:31.380 回答