我们的 Spring 5 应用程序被配置为使用 ActiveMQ 并持久化消息。我们决定使用 KahaDB 文件存储。
为了测试消息持久性,我注释掉了 MessageListener bean 并将消息发送到队列(通过 JmsTemplate),并验证消息已写入 kahadb 数据日志文件。取消注释 MessageListener bean 并重新启动代理后,消息不会传递到侦听器。我无法弄清楚为什么在代理重新启动时没有传递消息,任何帮助将不胜感激。
下面是将 KahaDb 持久性添加到 ActiveMQ 配置的代码:
@Bean(initMethod = "start", destroyMethod = "stop")
public BrokerService brokerServiceConfig() throws Exception {
BrokerService brokerService = new BrokerService();
brokerService.addConnector("vm://localhost");
brokerService.setBrokerName("order-broker");
PersistenceAdapter kahaDbAdapter = new KahaDBPersistenceAdapter();
File kahaDir = new File("/home/test");
kahaDbAdapter.setDirectory(kahaDir);
brokerService.setPersistenceAdapter(kahaDbAdapter);
brokerService.setPersistent(true);
}
@Bean
public JmsTemplate jmsTemplate(){
JmsTemplate template = new JmsTemplate();
template.setConnectionFactory(connectionFactory());
template.setExplicitQosEnabled(true);
template.setDeliveryMode(DeliveryMode.PERSISTENT);
return template;
}
@Bean
@DependsOn({"brokerService"})
private static ConnectionFactory connectionFactory() {
ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory();
activeMQConnectionFactory.setBrokerURL("vm://my-broker?create=false");
List<String> trustedPackageList = new ArrayList<>(activeMQConnectionFactory.getTrustedPackages());
trustedPackageList.add("com.mypackages");
activeMQConnectionFactory.setTrustedPackages(trustedPackageList);
CachingConnectionFactory connFactory = new CachingConnectionFactory();
activeMQConnectionFactory.setCopyMessageOnSend(false);
activeMQConnectionFactory.setUseAsyncSend(true);
connFactory.setTargetConnectionFactory(activeMQConnectionFactory);
return connFactory;
}
@Bean
public DefaultMessageListenerContainer messageListenerContainer() {
DefaultMessageListenerContainer dmlc = new DefaultMessageListenerContainer();
dmlc.setConnectionFactory(connectionFactory());
dmlc.setSessionTransacted(false);
dmlc.setSessionAcknowledgeMode(Session.AUTO_ACKNOWLEDGE);
dmlc.setDestinationName(TEST_QUEUE);
dmlc.setMaxConcurrentConsumers(25);
dmlc.setMessageListener(msgListener());
return container;
}
@Bean
public TestMsgListener msgListener() {
return new TestMsgListener(); // This is a Message Driven POJO.
}
// MessageProducer code -
@Autowired
private JmsTemplate jTemplate;
@Autowired
private ActiveMQQueue testQueue;
public void sendMessage() {
try {
MySerializedObject obj = <code to create new object>;
jTemplate.convertAndSend(this.testQueue, obj);
}catch(Throwable e) {
}
}
// MessageListener code -
public class TestMsgListener implements MessageListener {
@Autowired
public MessageConverter converter;
public final void onMessage(Message message) {
try {
MySerializedObject obj =
(MySerializedObject)converter.fromMessage(message));
} catch (Throwable e) {
// log error.
}
}