我有一个 activemq 配置,其中我有一个虚拟目的地和一个普通主题
我想根据消息头中的 JMSType 将所有 JMS 消息路由到目的地(VirtualTopic.Notifications)到 2 个队列(VirtualTopic.SMS、VirtualTopic.EMAIL)。
我希望正常的 Topic(VirtualTopic.gps) 像往常一样工作。
这是我对activemq.xml 的配置。这里创建了 Consumer.SMS.VirtualTopic 和 Consumer.EMAIL.VirtualTopic。
<destinations>
<queue physicalName="Consumer.SMS.VirtualTopic" />
<queue physicalName="Consumer.EMAIL.VirtualTopic" />
</destinations>
<destinationInterceptors>
<virtualDestinationInterceptor>
<virtualDestinations>
<compositeQueue name="VirtualTopic.Notifications" forwardOnly="false">
<forwardTo>
<filteredDestination selector="JMSType = 'SMS'" queue="Consumer.SMS.VirtualTopic"/>
<filteredDestination selector="JMSType = 'EMAIL'" queue="Consumer.EMAIL.VirtualTopic"/>
</forwardTo>
</compositeQueue>
</virtualDestinations>
</virtualDestinationInterceptor>
</destinationInterceptors>
而消费者和主题(VirtualTopic.gps)是从服务器端代码创建的。
private static MessageProducer getTopicProducer(String topicName) throws JMSException {
MessageProducer producer = topicProducers.get(topicName);
if (producer == null) {
logger.info("Creating message producer for Topic : {}", topicName);
Destination destination = session.createTopic(topicName);
List<String> queueNames = PropertyReader
.getPropertyStringList("jms.topic.consumer.list", JMSProducer.properties);
if (queueNames != null) {
for (String queueName : queueNames) {
Queue virtualQueue = session.createQueue(queueName);
MessageConsumer con = session.createConsumer(virtualQueue);
con.close();
}
}
producer = session.createProducer(destination);
topicProducers.put(topicName, producer);
}
return producer;
}
VirtualTopic.Notifications 的所有消息都被路由到 2 个不同的队列,消费者可以从各自的队列中获取消息
但问题是所有发送到 VirtualTopic.gps 的消息都被过滤了,消费者不能消费 gps 消息。