1

我正在使用虚拟目的地在 ActiveMQ 5.15.13 中实现发布订阅模型。

我有一个虚拟主题VirtualTopic,并且绑定了两个队列。每个队列都有自己的重新传递策略。假设Queue 1在处理消息时出现异常,将重试消息 2 次,Queue 2并将重试消息 3 次。发布重试消息将被发送到死信队列。我还使用了单独的死信队列策略,以便每个队列都有自己的死信队列。

我观察到,当一条消息被发送到 时VirtualTopic,具有相同消息 ID 的消息被传递到两个队列。我面临一个问题,如果两个队列的消费者都无法成功处理消息。发往的消息Queue 1在重试 2 次后被移动到死信队列。但是没有死信队列Queue 2,虽然队列 2 中的消息被重试了 3 次。

这是预期的行为吗?

代码:

public class ActiveMQRedelivery {

    private final ActiveMQConnectionFactory factory;

    public ActiveMQRedelivery(String brokerUrl) {
        factory = new ActiveMQConnectionFactory(brokerUrl);
        factory.setUserName("admin");
        factory.setPassword("password");
        factory.setAlwaysSyncSend(false);
    }

    public void publish(String topicAddress, String message) {
        final String topicName = "VirtualTopic." + topicAddress;
        try {
            final Connection producerConnection = factory.createConnection();
            producerConnection.start();
            final Session producerSession = producerConnection.createSession(false, AUTO_ACKNOWLEDGE);
            final MessageProducer producer = producerSession.createProducer(null);
            final TextMessage textMessage = producerSession.createTextMessage(message);
            final Topic topic = producerSession.createTopic(topicName);
            producer.send(topic, textMessage, PERSISTENT, DEFAULT_PRIORITY, DEFAULT_TIME_TO_LIVE);
        } catch (JMSException e) {
            throw new RuntimeException("Message could not be published", e);
        }

    }

    public void initializeConsumer(String queueName, String topicAddress, int numOfRetry) throws JMSException {
        factory.getRedeliveryPolicyMap().put(new ActiveMQQueue("*." + queueName + ".>"),
                getRedeliveryPolicy(numOfRetry));
        Connection connection = factory.createConnection();
        connection.start();
        final Session consumerSession = connection.createSession(false, CLIENT_ACKNOWLEDGE);
        final Queue queue = consumerSession.createQueue("Consumer." + queueName +
                ".VirtualTopic." + topicAddress);
        final MessageConsumer consumer = consumerSession.createConsumer(queue);
        consumer.setMessageListener(message -> {
            try {
                    System.out.println("in listener --- " + ((ActiveMQDestination)message.getJMSDestination()).getPhysicalName());
                    consumerSession.recover();
            } catch (JMSException e) {
                e.printStackTrace();
            }
        });
    }

    private RedeliveryPolicy getRedeliveryPolicy(int numOfRetry) {
        final RedeliveryPolicy redeliveryPolicy = new RedeliveryPolicy();
        redeliveryPolicy.setInitialRedeliveryDelay(0);
        redeliveryPolicy.setMaximumRedeliveries(numOfRetry);
        redeliveryPolicy.setMaximumRedeliveryDelay(-1);
        redeliveryPolicy.setRedeliveryDelay(0);
        return redeliveryPolicy;
    }

}

测试:

public class ActiveMQRedeliveryTest {

    private static final String brokerUrl = "tcp://0.0.0.0:61616";
    private ActiveMQRedelivery activeMQRedelivery;

    @Before
    public void setUp() throws Exception {
        activeMQRedelivery = new ActiveMQRedelivery(brokerUrl);
    }

    @Test
    public void testMessageRedeliveries() throws Exception {
        String topicAddress = "testTopic";
        activeMQRedelivery.initializeConsumer("queue1", topicAddress, 2);
        activeMQRedelivery.initializeConsumer("queue2", topicAddress, 3);
        activeMQRedelivery.publish(topicAddress, "TestMessage");
        Thread.sleep(3000);
    }

    @After
    public void tearDown() throws Exception {
    }
}
4

1 回答 1

0

我最近遇到了这个问题。要解决此问题,需要将 2 个属性添加到 individualDeadLetterStrategy 中,如下所示

<deadLetterStrategy>
        <individualDeadLetterStrategy destinationPerDurableSubscriber="true" enableAudit="false" queuePrefix="DLQ." useQueueForQueueMessages="true"/>
</deadLetterStrategy>

属性说明:

destinationPerDurableSubscriber - 为每个持久订阅者启用单独的目标。

enableAudit - 死信策略具有默认启用的消息审核。这可以防止将重复消息添加到配置的 DLQ。启用该属性后,当destinationPerDurableSubscriber 属性设置为true 时,未为多个订阅者传递到一个主题的同一消息将仅放置在其中一个订阅者DLQ 上,即两个消费者未能确认同一消息的同一消息。主题,该消息将仅放置在一个消费者的 DLQ 上,而不是另一个消费者。

于 2022-01-19T13:40:29.460 回答