0

我有一个 activemq 配置,其中我有一个虚拟目的地和一个普通主题

我想根据消息头中的 JMSType 将所有 JMS 消息路由到目的地(VirtualTopic.Notifications)到 2 个队列(VirtualTopic.SMS、VirtualTopic.EMAIL)。

我希望正常的 Topic(VirtualTopic.gps) 像往常一样工作。

这是我对activemq.xml 的配置。这里创建了 Consumer.SMS.VirtualTopic 和 Consumer.EMAIL.VirtualTopic。

    <destinations>
        <queue physicalName="Consumer.SMS.VirtualTopic" />
        <queue physicalName="Consumer.EMAIL.VirtualTopic" />
    </destinations>

    <destinationInterceptors>
      <virtualDestinationInterceptor>
        <virtualDestinations>
          <compositeQueue name="VirtualTopic.Notifications" forwardOnly="false">
            <forwardTo>
              <filteredDestination selector="JMSType = 'SMS'" queue="Consumer.SMS.VirtualTopic"/>
              <filteredDestination selector="JMSType = 'EMAIL'" queue="Consumer.EMAIL.VirtualTopic"/>
            </forwardTo>
          </compositeQueue>
        </virtualDestinations>
      </virtualDestinationInterceptor>
    </destinationInterceptors>

而消费者和主题(VirtualTopic.gps)是从服务器端代码创建的。

    private static MessageProducer getTopicProducer(String topicName) throws JMSException {
    MessageProducer producer = topicProducers.get(topicName);

    if (producer == null) {
        logger.info("Creating message producer for Topic : {}", topicName);
        Destination destination = session.createTopic(topicName);

        List<String> queueNames = PropertyReader
                .getPropertyStringList("jms.topic.consumer.list", JMSProducer.properties);
        if (queueNames != null) {
            for (String queueName : queueNames) { 
                Queue virtualQueue = session.createQueue(queueName);
                MessageConsumer con = session.createConsumer(virtualQueue);
                con.close();
            }
        }

        producer = session.createProducer(destination);
        topicProducers.put(topicName, producer);
    }

    return producer;
    }

VirtualTopic.Notifications 的所有消息都被路由到 2 个不同的队列,消费者可以从各自的队列中获取消息

但问题是所有发送到 VirtualTopic.gps 的消息都被过滤了,消费者不能消费 gps 消息。

4

2 回答 2

1

非常感谢哈森..

将此行添加<virtualTopic name=">" selectorAware="false" /> 到 activemq.xml 就可以了。

    <destinationInterceptors>
        <virtualDestinationInterceptor>
            <virtualDestinations>
                <compositeQueue name="VirtualTopic.Notifications"
                    forwardOnly="false">
                    <forwardTo>
                        <filteredDestination selector="JMSType = 'SMS'"
                            queue="Consumer.SMS.VirtualTopic" />
                        <filteredDestination selector="JMSType ='EMAIL'"
                            queue="Consumer.EMAIL.VirtualTopic" />
                    </forwardTo>
                </compositeQueue>
                <virtualTopic name=">" selectorAware="false" />
            </virtualDestinations>
        </virtualDestinationInterceptor>
    </destinationInterceptors>
于 2017-01-18T16:06:38.537 回答
0

以下示例显示如何在 XML 配置中设置一个元素,以便在将消息发送到 MY.QUEUE 时,它会真正转发到物理队列 FOO 和主题 BAR。

<destinationInterceptors>
  <virtualDestinationInterceptor>
    <virtualDestinations>
      <compositeQueue name="MY.QUEUE">
        <forwardTo>
          <queue physicalName="FOO" />
          <topic physicalName="BAR" />
        </forwardTo>
      </compositeQueue>
    </virtualDestinations>
  </virtualDestinationInterceptor>
</destinationInterceptors>

默认情况下,订阅者不能直接从复合队列或主题消费消息——它只是一个逻辑结构。鉴于上述配置,订阅者只能消费来自 FOO 和 BAR 的消息;但不是 MY.QUEUE。可以更改此行为以实现用例,例如通过将相同的消息发送到通知主题(窃听)来监视队列,通过将可选设置的 forwardOnly 属性设置为 false。

<compositeQueue name="IncomingOrders" forwardOnly="false">
    <forwardTo>
        <topic physicalName="Notifications" />
    </forwardTo>
</compositeQueue>

发送到 IncomingOrders 的消息都将被复制并转发到 Notifications,然后被放置在物理 IncomingOrders 队列中供订阅者使用。

看看这里http://activemq.apache.org/virtual-destinations.html

使用您的实际配置,您只能从队列的SMS 和 EMAIL中使用,如果您想从Notifications中使用,您需要设置 forwardOnly="false"

更新: 试试这个代码:

import javax.jms.Connection;
import javax.jms.JMSException;
import javax.jms.MessageProducer;

import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.ActiveMQMessageConsumer;
import org.apache.activemq.ActiveMQSession;
import org.apache.activemq.command.ActiveMQTextMessage;

public class SimpleSenderConsumerVirtualTopic {

    public static void main(String[] args) throws JMSException {
        Connection conn = null;
        try {
            ActiveMQConnectionFactory cf = new ActiveMQConnectionFactory();
            conn = cf.createConnection( );
            ActiveMQSession session = (ActiveMQSession) conn.createSession(false,
                    ActiveMQSession.AUTO_ACKNOWLEDGE);
            ActiveMQMessageConsumer consumer = (ActiveMQMessageConsumer) session
                    .createConsumer(session.createQueue("Consumer.A.VirtualTopic.gps"));
            MessageProducer producer = session.createProducer(session.createTopic("VirtualTopic.gps"));
            conn.start();
            ActiveMQTextMessage msg = (ActiveMQTextMessage) session.createTextMessage("VirtualTopic.gps test");
            producer.send(msg);
            msg = null;
            while ((msg = (ActiveMQTextMessage) consumer.receive(5000)) != null) {
                System.out.println("Received message is: " + msg.getText());
            }
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            if (conn != null) {
                try {
                    conn.close();
                } catch (Exception e) {
                }
            }
        }
    }
}

并添加:

<destinationInterceptors>
      <virtualDestinationInterceptor>
        <virtualDestinations>
          <compositeQueue name="VirtualTopic.Notifications" forwardOnly="false">
            <forwardTo>
              <filteredDestination selector="JMSType = 'SMS'" queue="Consumer.SMS.VirtualTopic"/>
              <filteredDestination selector="JMSType = 'EMAIL'" queue="Consumer.EMAIL.VirtualTopic"/>
            </forwardTo>
          </compositeQueue>
          <virtualTopic name=">"  selectorAware="false" />
        </virtualDestinations>
      </virtualDestinationInterceptor>
    </destinationInterceptors>
于 2017-01-18T10:38:22.697 回答