4

我似乎找不到在 ActiveMQ(Java 版本)中侦听新的生产者和消费者连接(或连接中断)的方法。我希望能够告诉消费者(或者他们可以自己发现)生产者的连接断开了。反过来(生产者发现某个消费者断开连接)也是必需的。

我会很感激一些帮助。

4

2 回答 2

4

我认为您想在特定目的地(特定队列或主题)上监听新的生产者和消费者。那正确吗?

您可以实例化 ConsumerEventSource 和 ProducerEventSource,并通过分别调用它们的 setConsumerListener 和 setProducerListener 来注册自己的侦听器。

所以:

Connection conn = yourconnection; // the connection your listener will use
Destination dest = yourdestination; // the destination you're paying attention to
ConsumerEventSource source = new ConsumerEventSource(conn, dest);
source.setConsumerListener(new ConsumerListener() {

   public void onConsumerEvent(ConsumerEvent event) {
      if (event.isStarted()) {
         System.out.println("a new consumer has started - " + event.getConsumerId());
      } else {
         System.out.println("a consumer has dropped - " + event.getConsumerId());
      }
   }

});

如果您查看 ConsumerEventSource 或 ProducerEventSource 的代码,您会发现它们是简单的对象,它们使用 AdvisorySupport 的方法来监听一个特殊的咨询主题,其业务是广播有关生产者和消费者的新闻。您可以通过阅读这些类的源代码来了解更多信息。

您对“连接”的使用可能存在问题;在 ActiveMQ 领域(它是 JMS 领域的一个子集)中,“连接”是与特定目的地无关的较低级别的对象。一个特定的客户端从一个连接创建一个“会话”——仍然不是特定于一个目标——然后创建一个特定于目标的 QueueSender、QueueReceiver、TopicPublisher 或 TopicSubscriber。当这些事件被创建时,或者当创建它们的会话终止时,这些是您想要听到的事件,并且如果您使用上面的代码将会听到。

于 2009-11-09T21:27:02.613 回答
2

我需要的所有信息都发布在 ActiveMQ 咨询主题中,例如“ActiveMQ.Advisory.Connection”或简单的“ActiveMQ.Advisory..>”。甚至在 Stomp Connection 中发生的事件也会发布在 ActiveMQ 咨询主题中。以下代码给出了一个示例(使用通过 Stomp 连接的 Flex 客户端进行测试):

ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory("user", "password", ActiveMQConnection.DEFAULT_BROKER_URL);
Connection connection = connectionFactory.createConnection();
Session session = connection.createSession(transacted, ackMode);
connection.start();
Destination destinationAdvisory = session.createTopic("ActiveMQ.Advisory..>");
MessageConsumer consumerAdvisory = session.createConsumer(destinationAdvisory);
consumerAdvisory.setMessageListener(new MessageListener() {
    public void onMessage(Message message) {

            if (message instanceof ActiveMQMessage) {
                ActiveMQMessage activeMessage = (ActiveMQMessage) message;
                Object command = activeMessage.getDataStructure();
                if (command instanceof ConsumerInfo) {
                    System.out.println("A consumer subscribed to a topic or queue: " + command);
                } else if (command instanceof RemoveInfo) {
                    RemoveInfo removeInfo = (RemoveInfo) command;
                    if (removeInfo.isConsumerRemove()) {
                        System.out.println("A consumer unsubscribed from a topic or queue");
                    }
                    else {
                        System.out.println("RemoveInfo, a connection was closed: " + command);
                    }
                } else if (command instanceof ConnectionInfo) {
                    System.out.println("ConnectionInfo, a new connection was made: " + command);
                } else {
                    System.out.println("Unknown command: " + command);
                }
            }
    }
});
于 2009-11-12T11:46:00.473 回答