1

大家好,我无法使用活动 mq 接收异步消息,下面是我用于发布消息和订阅的代码。

    public class publishMessage extends HttpServlet {
    protected void doGet(HttpServletRequest request, HttpServletResponse response) throws ServletException, IOException {

       InitialContext initCtx = new InitialContext();
            Context envContext = (Context) initCtx.lookup("java:comp/env");
            ActiveMQConnectionFactory connectionFactory = (ActiveMQConnectionFactory) envContext.lookup("jms/ConnectionFactory");
            Connection connection = connectionFactory.createConnection();
            connection.start() ;
            Session session = connection.createSession(true, Session.AUTO_ACKNOWLEDGE);
            Destination destination = session.createQueue("jms/topic/MyQueue");
            MessageProducer producer = session.createProducer(destination);
            producer.setDeliveryMode(DeliveryMode.PERSISTENT) ;
            TextMessage msg=session.createTextMessage();
            msg.setText("Message sent");
                    System.out.println("Message Sent");
            producer.send(msg);
            session.commit() ;
            connection.close() ;
            }catch(Exception ex){
                ex.printStackTrace() ;
            }
    }}


class Consumer{

    protected Queue queue;


    protected String queueName = "jms/topic/MyQueue";

    protected String url = ActiveMQConnection.DEFAULT_BROKER_URL;

    protected int ackMode = Session.AUTO_ACKNOWLEDGE;

    public static void main(String[] args) {
        Consumer rec=new Consumer();
        try {
            rec.run();
        } catch (Exception e) {
            e.printStackTrace();
        }

    }

    public void run() throws JMSException{

        System.out.println("URL:" + url);
        ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory(url);
        TopicConnection connection = (TopicConnection)connectionFactory.createTopicConnection();
        connection.setClientID("Testingconn1") ;
        connection.start();
        MessageConsumer consumer = null;
        Session session = connection.createSession(true, Session.CLIENT_ACKNOWLEDGE);
        queue = session.createQueue(queueName) ;
        consumer = session.createConsumer(queue);
        consumer.setMessageListener(new MyListener()) ;
        session.commit() ;
        consumer.close();
        session.close();
        connection.close();
    }

}

package com.java;

import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageListener;
import javax.jms.TextMessage;

public class MyListener implements MessageListener{

    @Override
    public void onMessage( final Message message )
    {
        if ( message instanceof TextMessage )
        {
            final TextMessage textMessage = (TextMessage) message;
            try
            {
                System.out.println("Listener:" +  textMessage.getText() );
            }
            catch (final JMSException e)
            {
                e.printStackTrace();
            }
        }

    }
}

当我执行上述代码时,发布者成功发送了一条消息,当我启动消费者时,输出按要求出现,没有输出进入我的控制台。

请帮我解决它或建议一个代码来接收异步消息。

4

2 回答 2

1

您的消费者代码实际上并没有给消费者任何时间来消费消息。当消费者将存在一段时间并且您的应用程序可以处理传入消息时,将使用异步消费者。在上面的代码中,您最好使用 MessageConsumer 的定时接收方法,例如 consumer.receive(5000) 或一些适合您应用的时间段。您的消费者不能保证在创建后立即收到消息,因为代理需要时间来注册消费者并将消息路由到它,因此您必须在您的应用程序中允许这样做。

于 2012-09-28T10:28:26.123 回答
1

Your consumer class is correct. It runs smoothly.

But, you consumer is incorrect & you have to modify it.

  • First, add setClientID("any_string_value") after creating connection object;

     eg: Connection connection = connectionFactory.createConnection();
        // need to setClientID value, any string value you wish
        connection.setClientID("12345");
    
    • secondly, use createDurableSubscriber() method instead of createConsumer() for transmitting message via topic.

       MessageConsumer consumer = session.createDurableSubscriber(topic,"SUB1234");
      

Here is the modified comsumer class:

package mq.test;

import javax.jms.*;

import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.ActiveMQConnectionFactory;

public class consumer {
    // URL of the JMS server
    private static String url = ActiveMQConnection.DEFAULT_BROKER_URL;

    // Name of the topic from which we will receive messages from = " testt"

    public static void main(String[] args) throws JMSException {
        // Getting JMS connection from the server

        ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(url);
        Connection connection = connectionFactory.createConnection();

        // need to setClientID value, any string value you wish
        connection.setClientID("12345");

        try{
        connection.start();
        }catch(Exception e){
            System.err.println("NOT CONNECTED!!!");
        }
        Session session = connection.createSession(false,
                Session.AUTO_ACKNOWLEDGE);

        Topic topic = session.createTopic("test_data");

        //need to use createDurableSubscriber() method instead of createConsumer() for topic
        // MessageConsumer consumer = session.createConsumer(topic);
        MessageConsumer consumer = session.createDurableSubscriber(topic,
                "SUB1234");

        MessageListener listner = new MessageListener() {
            public void onMessage(Message message) {
                try {
                    if (message instanceof TextMessage) {
                        TextMessage textMessage = (TextMessage) message;
                        System.out.println("Received message"
                                + textMessage.getText() + "'");
                    }
                } catch (JMSException e) {
                    System.out.println("Caught:" + e);
                    e.printStackTrace();
                }
            }
        };

        consumer.setMessageListener(listner);
        //connection.close();

    }
}
于 2014-08-29T10:34:57.547 回答