5

我想构建一个简单的消费者程序(在 java 中)来获取存储在 ActiveMQ 主题中的所有消息。我有一个在队列中发送 TextMessage 的生产者。

但我不知道如何开始编写我的消费者来检索旧消息并等待新消息。

如果有例子,谢谢!

这是我的制片人:http: //pastebin.com/uRy9D8mY

这是我的消费者: http: //pastebin.com/bZh4r66e

当我在消费者之前运行生产者,然后运行消费者时,我什么也没有。当我运行我的消费者和生产者时,我在队列中添加了 72 条消息,但我的消费者只收到了 24 条消息......

4

2 回答 2

5

我建议阅读本教程(就像 Apache ActiveMQ 一样)SUN Jms 教程

编写 JMS/ActiveMQ 程序的方法有很多,可以使用 Spring 等各种框架,也可以使用纯 java。

本质上,编写一个像这样的监听器类:

public class MyListener implements MessageListener{
   public void onMessage(Message message){
      // Read and handle message here.
   }
}

由于您已经在生成消息,我假设您已建立并运行连接。

session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
consumer = session.createConsumer("MyQueue");
listener = new MyListener ();
consumer.setMessageListener(listener);
connection.start(); 
// At this point, messages should arrive from the queue to your listener.

然后有一些错误处理代码没有包含在这个例子中,但是你应该能够在教程和 JMS 文档的帮助下弄清楚。

于 2012-05-29T08:11:55.850 回答
2

使用下面给出的代码,您可以读取队列中排队的所有消息。

  • 在这段代码中,while 循环是一个无休止的循环,它将迭代队列中的所有消息。
  • 一旦队列中没有消息,它将等待 5 秒,然后自动停止连接并中断循环。

如果您需要一个无休止的消费者,它将在新添加到队列时读取所有消息,然后删除 else 部分,这样程序就不会终止。

    ConnectionFactory factory =  new ActiveMQConnectionFactory("tcp://localhost:61616");
    Connection con = factory.createConnection();
    Session session =   con.createSession(false, Session.AUTO_ACKNOWLEDGE);
    Queue queue = session.createQueue("tmp_queue2");
    MessageConsumer consumer = session.createConsumer(queue);
    con.start();     
    while (true) {     
        Message msg = consumer.receive(5000); 
        if (msg instanceof TextMessage) {
            TextMessage tm = (TextMessage) msg;
            System.out.println(tm.getText());       
        }
        else{
            System.out.println("Queue Empty"); 
            con.stop();
            break;
        }
    }

希望这个消费者计划能帮助那些刚接触 ActiveMQ 的人。

于 2016-09-13T12:44:59.967 回答