0

我们的 Spring 5 应用程序被配置为使用 ActiveMQ 并持久化消息。我们决定使用 KahaDB 文件存储。

为了测试消息持久性,我注释掉了 MessageListener bean 并将消息发送到队列(通过 JmsTemplate),并验证消息已写入 kahadb 数据日志文件。取消注释 MessageListener bean 并重新启动代理后,消息不会传递到侦听器。我无法弄清楚为什么在代理重新启动时没有传递消息,任何帮助将不胜感激。

下面是将 KahaDb 持久性添加到 ActiveMQ 配置的代码:

   @Bean(initMethod = "start", destroyMethod = "stop")
   public BrokerService brokerServiceConfig() throws Exception {
        BrokerService brokerService = new BrokerService();
        brokerService.addConnector("vm://localhost");
        brokerService.setBrokerName("order-broker");
        PersistenceAdapter kahaDbAdapter = new KahaDBPersistenceAdapter();
        File kahaDir = new File("/home/test");
        kahaDbAdapter.setDirectory(kahaDir);
        brokerService.setPersistenceAdapter(kahaDbAdapter);
        brokerService.setPersistent(true);
   }

    @Bean
    public JmsTemplate jmsTemplate(){
        JmsTemplate template = new JmsTemplate();
        template.setConnectionFactory(connectionFactory());
        template.setExplicitQosEnabled(true);
        template.setDeliveryMode(DeliveryMode.PERSISTENT);
        return template;
    }
    
    @Bean
    @DependsOn({"brokerService"})
    private static ConnectionFactory connectionFactory() {
        ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory();
        activeMQConnectionFactory.setBrokerURL("vm://my-broker?create=false");
        
        List<String> trustedPackageList = new ArrayList<>(activeMQConnectionFactory.getTrustedPackages());
        trustedPackageList.add("com.mypackages");
        activeMQConnectionFactory.setTrustedPackages(trustedPackageList);
        CachingConnectionFactory connFactory = new CachingConnectionFactory();
        activeMQConnectionFactory.setCopyMessageOnSend(false);
        activeMQConnectionFactory.setUseAsyncSend(true);
        connFactory.setTargetConnectionFactory(activeMQConnectionFactory);
        return connFactory;
    }

    @Bean
    public DefaultMessageListenerContainer messageListenerContainer() {
        DefaultMessageListenerContainer dmlc = new DefaultMessageListenerContainer();
        dmlc.setConnectionFactory(connectionFactory()); 
        dmlc.setSessionTransacted(false);
        dmlc.setSessionAcknowledgeMode(Session.AUTO_ACKNOWLEDGE);   
        dmlc.setDestinationName(TEST_QUEUE);
        dmlc.setMaxConcurrentConsumers(25); 
        dmlc.setMessageListener(msgListener()); 
        return container;
    }       
    
    @Bean
    public TestMsgListener msgListener() {
        return new TestMsgListener(); // This is a Message Driven POJO.
    }

    // MessageProducer code -
    @Autowired
    private JmsTemplate jTemplate;
    
    @Autowired
    private ActiveMQQueue testQueue;

    public void sendMessage() {
        try {
            MySerializedObject obj = <code to create new object>;
            jTemplate.convertAndSend(this.testQueue, obj);
        }catch(Throwable e) {
        }
    }

    // MessageListener code -
    public class TestMsgListener implements MessageListener {

    @Autowired
    public MessageConverter converter;

    public final void onMessage(Message message) {
        try {
             MySerializedObject obj = 
             (MySerializedObject)converter.fromMessage(message));

        } catch (Throwable e) {
          // log error.
        }
    }
 
4

0 回答 0