20

有没有办法抑制 ActiveMQ 服务器上定义的队列上的重复消息?

我尝试手动定义 JMSMessageID,(message.setJMSMessageID("uniqueid")),但服务器忽略此修改并使用内置生成的 JMSMessageID 传递消息。

按照规范,我没有找到有关如何删除重复消息的参考。

在 HornetQ 中,为了处理这个问题,我们需要在消息定义中声明 HQ 特定的属性 org.hornetq.core.message.impl.HDR_DUPLICATE_DETECTION_ID。

IE:

Message jmsMessage = session.createMessage();
String myUniqueID = "This is my unique id"; // Could use a UUID for this
message.setStringProperty(HDR_DUPLICATE_DETECTION_ID.toString(), myUniqueID);

有人知道 ActiveMQ 是否有类似的解决方案?

4

5 回答 5

7

您应该查看 Apache Camel,它提供了一个可以与任何 JMS 提供程序一起使用的幂等消费者组件,请参阅:http ://camel.apache.org/idempotent-consumer.html

将它与 ActiveMQ 组件结合使用使得 JMS 的使用变得非常简单,请参阅: http ://camel.apache.org/activemq.html

于 2011-02-09T11:55:14.110 回答
5

我怀疑 ActiveMQ 是否原生支持它,但实现幂等消费者应该很容易。一种方法是在生产者端为每条消息添加一个唯一标识符,现在在消费者端使用存储(数据库、缓存等),可以检查消息之前是否已收到,并且根据该检查继续处理。

我看到了前面的 stackoverflow 问题 - Apache ActiveMQ 5.3 - 如何配置队列以拒绝重复消息?,这也可能有所帮助。

于 2011-02-09T00:41:31.217 回答
4

现在支持删除复制到 ActiveMQ 传输中的重复消息。请参阅配置值auditDepthauditMaximumProducerNumber连接配置指南

于 2013-08-08T15:49:23.740 回答
3

有一种方法可以使 ActiveMQ 根据 JMS 属性过滤重复项。它涉及编写一个 Activemq插件。将重复消息发送到死信队列的基本代理过滤器将如下所示

import java.util.List;
import java.util.regex.Matcher;
import java.util.regex.Pattern;

import org.apache.activemq.broker.Broker;
import org.apache.activemq.command.Message;
import org.apache.activemq.command.ActiveMQMessage;
import org.apache.activemq.broker.BrokerFilter;
import org.apache.activemq.broker.ConnectionContext;
import org.apache.activemq.command.ConnectionInfo;
import org.apache.activemq.broker.ProducerBrokerExchange;

public class DuplicateFilterBroker extends BrokerFilter {
    String messagePropertyName;
    boolean switchValue;

    public DuplicateFilterBroker(Broker next, String messagePropertyName) {
        super(next);
        this.messagePropertyName = messagePropertyName;
    }

    public boolean hasDuplicate(String propertyValue){
        switchValue = propertyValue;
        return switchValue;
    }

    public void send(ProducerBrokerExchange producerExchange, Message msg) throws Exception { 
        ActiveMQMessage amqmsg = (ActiveMQMessage)msg; 
        Object msgObj = msg.getMessage(); 
        if (msgObj instanceof javax.jms.Message) { 
            javax.jms.Message jmsMsg = (javax.jms.Message) msgObj; 
            if (!hasDuplicate(jmsMsg.getStringProperty(messagePropertyName))) {
                super.send(producerExchange, msg);
            }
            else {
               sendToDeadLetterQueue(producerExchange.getConnectionContext(), msg);
            } 
        }
    }  
}
于 2012-08-27T09:34:47.147 回答
0

似乎问题中建议的方式也适用于 ActiveMQ(2016/12)。请参阅activemq-artemis 指南。这需要生产者在消息中设置特定的属性。

Message jmsMessage = session.createMessage();
String myUniqueID = "This is my unique id";   // Could use a UUID for this
message.setStringProperty(HDR_DUPLICATE_DETECTION_ID.toString(), myUniqueID);

但是包含该属性的类是不同的: org.apache.activemq.artemis.core.message.impl.HDR_DUPLICATE_DETECTION_ID并且属性值为_AMQ_DUPL_ID.

于 2016-12-13T02:25:16.897 回答