消息的生产者不会持久发送消息,当我尝试通过 MessageListener 使用消息时,如果发生任何异常(运行时),它会重试特定次数(默认为 AMQ 端的 6 次)并且消息丢失。
原因是生产者没有将 Delivery 模式设置为 Persistent,经过一定次数的重试后,DLQ 没有被创建,消息也没有移动到 DLQ。因此,我丢失了消息。
我的代码是这样的:-
@Configuration
@PropertySource("classpath:application.properties")
public class ActiveMqJmsConfig {
@Autowired
private AbcMessageListener abcMessageListener;
public DefaultMessageListenerContainer purchaseMsgListenerforAMQ(
@Qualifier("AMQConnectionFactory") ConnectionFactory amqConFactory) {
LOG.info("Message listener for purchases from AMQ : Starting");
DefaultMessageListenerContainer defaultMessageListenerContainer =
new DefaultMessageListenerContainer();
defaultMessageListenerContainer.setConnectionFactory(amqConFactory);
defaultMessageListenerContainer.setMaxConcurrentConsumers(4);
defaultMessageListenerContainer
.setDestinationName(purchaseReceivingQueueName);
defaultMessageListenerContainer
.setMessageListener(abcMessageListener);
defaultMessageListenerContainer.setSessionTransacted(true);
return defaultMessageListenerContainer;
}
@Bean
@Qualifier(value = "AMQConnectionFactory")
public ConnectionFactory activeMQConnectionFactory() {
ActiveMQConnectionFactory amqConnectionFactory =
new ActiveMQConnectionFactory();
amqConnectionFactory
.setBrokerURL(System.getProperty(tcp://localhost:61616));
amqConnectionFactory
.setUserName(System.getProperty(admin));
amqConnectionFactory
.setPassword(System.getProperty(admin));
return amqConnectionFactory;
}
}
@Component
public class AbcMessageListener implements MessageListener {
@Override
public void onMessage(Message msg) {
//CODE implementation
}
}
问题:- 通过在连接级别设置客户端 ID (Connection.setclientid("String")),即使消息不是持久的,我们也可以订阅为持久订阅者。通过这样做,如果应用程序抛出运行时异常,在一定次数的重试尝试后,将为队列创建 DLQ,并将消息移动到 DLQ。
但是在 DefaultMessageListenerContainer 中,连接不会暴露给客户端。我猜它是由 Class 本身作为一个池维护的。
如何在 DefaultMessageListenerContainer 中实现持久订阅?