0

我正在学习在我的项目中实现活动 mq 接口。这就是我创建生产者和消费者的方式。

public void connectionSetup(String portName) { // portname is object of PortTO class.   We are creating producer and consumer pair for every existing PortTO object.
            Connection connection = null; 
            try { 

                    if (timeToLive != 0) { 

                    } 

                    // Create the connection. 
                    ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory(user, password, url); 
                    connection = connectionFactory.createConnection(); 

                    connection.start(); 
                    connection.setExceptionListener(this); 

                    // Create the session 
                    Session session = connection.createSession(transacted, Session.AUTO_ACKNOWLEDGE); 
                    if (topic) { 
                            destination = session.createTopic(subject); 
                    } else { 
                            destination = session.createQueue(portName); 
                    } 

                    // Create the producer. 
                    MessageProducer producer = session.createProducer(destination);                        if (persistent) { 
                            producer.setDeliveryMode(DeliveryMode.PERSISTENT); 
                    } else { 
                            producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT); 
                    } 


                    MessageConsumer consumer = session.createConsumer(destination);                        if (timeToLive != 0) 
                            producer.setTimeToLive(timeToLive); 

                    mapOfSession.put(portName, session); 
                    mapOfMessageProducer.put(portName, producer); 
                    mapOfMessageConsumer.put(portName, consumer);                        log.info("Producer is " + producer); 
                    log.info("Consumer is " + consumer); 

            } catch (Exception e) { 

                    log.error(e.getMessage()); 
            } 
    } 

因此,我们正在创建生产者和消费者,并将它们存储在每个 PortTO 对象的映射中。现在,生产者正在发送消息:

TextMessage message = session.createTextMessage(); 
 message.setIntProperty(key, 2); 
  producer.send(message);        

但是消费者并没有消费它...

        public void onMessage(Message message) { 
            PortService portService = new PortService();      
           List<PortTO> portTOList = portService.getMoxaPorts(); 
                    for(PortTO portTO : portTOList) {  // catching messages from producers of every PortTO object                                
              MessageConsumer consumer = DataCollectionMessageProducer.getMapOfMessageConsumer().get(portTO.getPort());  // getting consumer from map of PortTO
                            consumer.setMessageListener(this); 
                            message = consumer.receive(1000);                                if (message instanceof TextMessage) { 
            / / some processing 
                         } 
                            } else { 
                                    if (verbose) { 

                                    } 
                            } 

                    } 
                    } 

可能是什么原因?我的方法错了吗??

4

1 回答 1

0

您正在 onMessage 方法中设置 messageListener。这是第 22 个问题,因为仅当 messageListener 设置为该对象时才会调用 onMessage 方法。

另一件事,我不确定您为什么要在消息侦听器中接收。一旦将队列中的每条消息设置为侦听器,就会为队列中的每条消息调用 onMessage,并且每条接收到的消息的逻辑都应该以事件驱动的方式驻留在其中。至少,这首先是 JMS 的想法

于 2012-06-15T22:14:25.933 回答