我似乎找不到在 ActiveMQ(Java 版本)中侦听新的生产者和消费者连接(或连接中断)的方法。我希望能够告诉消费者(或者他们可以自己发现)生产者的连接断开了。反过来(生产者发现某个消费者断开连接)也是必需的。
我会很感激一些帮助。
我认为您想在特定目的地(特定队列或主题)上监听新的生产者和消费者。那正确吗?
您可以实例化 ConsumerEventSource 和 ProducerEventSource,并通过分别调用它们的 setConsumerListener 和 setProducerListener 来注册自己的侦听器。
所以:
Connection conn = yourconnection; // the connection your listener will use
Destination dest = yourdestination; // the destination you're paying attention to
ConsumerEventSource source = new ConsumerEventSource(conn, dest);
source.setConsumerListener(new ConsumerListener() {
public void onConsumerEvent(ConsumerEvent event) {
if (event.isStarted()) {
System.out.println("a new consumer has started - " + event.getConsumerId());
} else {
System.out.println("a consumer has dropped - " + event.getConsumerId());
}
}
});
如果您查看 ConsumerEventSource 或 ProducerEventSource 的代码,您会发现它们是简单的对象,它们使用 AdvisorySupport 的方法来监听一个特殊的咨询主题,其业务是广播有关生产者和消费者的新闻。您可以通过阅读这些类的源代码来了解更多信息。
您对“连接”的使用可能存在问题;在 ActiveMQ 领域(它是 JMS 领域的一个子集)中,“连接”是与特定目的地无关的较低级别的对象。一个特定的客户端从一个连接创建一个“会话”——仍然不是特定于一个目标——然后创建一个特定于目标的 QueueSender、QueueReceiver、TopicPublisher 或 TopicSubscriber。当这些事件被创建时,或者当创建它们的会话终止时,这些是您想要听到的事件,并且如果您使用上面的代码将会听到。
我需要的所有信息都发布在 ActiveMQ 咨询主题中,例如“ActiveMQ.Advisory.Connection”或简单的“ActiveMQ.Advisory..>”。甚至在 Stomp Connection 中发生的事件也会发布在 ActiveMQ 咨询主题中。以下代码给出了一个示例(使用通过 Stomp 连接的 Flex 客户端进行测试):
ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory("user", "password", ActiveMQConnection.DEFAULT_BROKER_URL);
Connection connection = connectionFactory.createConnection();
Session session = connection.createSession(transacted, ackMode);
connection.start();
Destination destinationAdvisory = session.createTopic("ActiveMQ.Advisory..>");
MessageConsumer consumerAdvisory = session.createConsumer(destinationAdvisory);
consumerAdvisory.setMessageListener(new MessageListener() {
public void onMessage(Message message) {
if (message instanceof ActiveMQMessage) {
ActiveMQMessage activeMessage = (ActiveMQMessage) message;
Object command = activeMessage.getDataStructure();
if (command instanceof ConsumerInfo) {
System.out.println("A consumer subscribed to a topic or queue: " + command);
} else if (command instanceof RemoveInfo) {
RemoveInfo removeInfo = (RemoveInfo) command;
if (removeInfo.isConsumerRemove()) {
System.out.println("A consumer unsubscribed from a topic or queue");
}
else {
System.out.println("RemoveInfo, a connection was closed: " + command);
}
} else if (command instanceof ConnectionInfo) {
System.out.println("ConnectionInfo, a new connection was made: " + command);
} else {
System.out.println("Unknown command: " + command);
}
}
}
});