最后,我找到了一种解决方法,允许我只为队列的一个子集创建 MirroredQueues,给定一个前缀。
我所做的是创建自己的 DestinationInterceptor,以便仅为我感兴趣的那些队列创建一个镜像队列,并排除其余队列(因为默认的MirroredQueue实现镜像了系统中创建的所有队列)。
我是怎么做到的。我将 MirroredQueue.java 类实现从库中复制到了一个名为 CustomMirroredQueue 的新类中,并向名为mirroring的类添加了一个新属性。我从接口DestinationInterceptor修改了intercept(final Destination destination)实现,考虑到 if 语句中的这个新属性(我为它创建了一个名为isPrefixMirrored的辅助方法):
/*
* This method is responsible for intercepting all the queues/topics that are created in the system.
* In this particular case we are interested only in the queues, in order we can mirror *some* of them and get
* a copy of the messages that are sent to them (With the topics this mirroring is not necessary since we would just
* subscribe to that topic for receiving the same message).
* */
public Destination intercept(final Destination destination) {
if (destination.getActiveMQDestination().isQueue()) {
if (isPrefixMirrored(destination) && (!destination.getActiveMQDestination().isTemporary() || brokerService.isUseTempMirroredQueues())) {
try {
//we create a mirrored queue for that destination
final Destination mirrorDestination = getMirrorDestination(destination);
if (mirrorDestination != null) {
return new DestinationFilter(destination) {
public void send(ProducerBrokerExchange context, Message message) throws Exception {
message.setDestination(mirrorDestination.getActiveMQDestination());
mirrorDestination.send(context, message);
if (isCopyMessage()) {
message = message.copy();
}
message.setDestination(destination.getActiveMQDestination());
message.setMemoryUsage(null); // set this to null so that it will use the queue memoryUsage instance instead of the topic.
super.send(context, message);
}
};
}
} catch (Exception e) {
LOG.error("Failed to lookup the mirror destination for: {}", destination, e);
}
}
}
return destination;
}
/*
* @returns true if the destination passed as parameter will be mirrored. If the value for the attribute "mirroring"
* is an empty string "" then all the queues will be mirrored by default.
**/
private boolean isPrefixMirrored(Destination destination) {
if (mirroring.equals("")) {
return true;
}
List<String> mirroredQueuesPrefixes = Arrays.asList(mirroring.split(","));
final String destinationPhysicalName = destination.getActiveMQDestination().getPhysicalName();
return mirroredQueuesPrefixes.stream().map(String::trim).anyMatch(destinationPhysicalName::contains);
}
我生成了一个仅包含此自定义类和依赖项(为此使用 gradle)的 .jar,并将其添加到 ActimeMQ 代理安装中的 lib 文件夹中。然后我可以在 ActiveMQ 配置 XML 文件中使用这个标签作为bean :
<destinationInterceptors>
<bean xmlns="http://www.springframework.org/schema/beans" class="package.CustomMirroredQueue" id="CustomMirroredQueue">
<property name="copyMessage" value="true"/>
<property name="postfix" value=""/>
<property name="prefix" value="mirror."/>
<property name="mirroring" value="PREFIX_1, QUEUE2, QUEUE3"/>
</bean>
</destinationInterceptors>
该类必须包含该类在 ActiveMQ 库文件夹中的路径。属性 copyMessage、postfix 和 prefix 来自默认的 MirroredQueue 实现。并且镜像属性将是一个列表,其中包含要镜像的所有特定队列/前缀(并且只有那些)。