我有一些架构问题。我开发了一个在 glassfish appserver 上运行的 wicket web-app,有一些 ejb 模块等。要创建聊天面板,我想使用某种 JMS,例如 Apache ActiveMQ 以避免连续轮询。我做了 2 种处理程序Sender
和Receiver
. 它们都有ConnectionFactory、Connecton、Session。聊天面板代表聊天室,每个聊天室都是一个 Topic。(当您使用选择的“房间”打开聊天面板时,您在逻辑上订阅了给定的主题。)面板实现了 MessageListener 接口,因此面板是 Receiver 对象中消费者的侦听器。
所以面板在他们的 Sender 对象中向他们的主题发送消息,并通过他们的 Receiver 对象来监听它们。(是的,每个面板也有一个Sender
和一个Receiver
对象)
这样,有 2 个连接/面板,根本没有人关心关闭这些连接。:(
我不认为这是使用activemq的好方法。我需要一些建议来实现这个功能。能否请教一下,高手是怎么做的?或者也许我必须去哪个方向?
(ps,AMQ 完全独立运行)
发件人,
public class Sender {
private String user = ActiveMQConnection.DEFAULT_USER;
private String password = ActiveMQConnection.DEFAULT_PASSWORD;
private String url = "failover://(tcp://0.0.0.0:61616)?randomize=false";
private Destination destination;
private Connection connection = null;
private Session session = null;
private ActiveMQConnectionFactory connectionFactory;
private MessageProducer producer;
public Sender(String topicId) {
try {
connectionFactory = new ActiveMQConnectionFactory(user, password, url);
connectionFactory.setDispatchAsync(false);
connection = connectionFactory.createTopicConnection(user, password);
connection.start();
session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
destination = session.createTopic(topicId);
producer = session.createProducer(destination);
} catch (JMSException ex) {
Logger.getLogger(Sender.class.getName()).log(Level.SEVERE, null, ex);
System.out.println(ex.toString());
}
}
public void sendMessage(String message) {
try {
BytesMessage message = session.createBytesMessage();
message.setLongProperty("text", message);
producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
producer.send(destination, message);
} catch (JMSException ex) {
Logger.getLogger(Sender.class.getName()).log(Level.SEVERE, null, ex);
}
}
public void close() {
try {
producer.close();
session.close();
connection.close();
} catch (JMSException ex) {
Logger.getLogger(Sender.class.getName()).log(Level.SEVERE, null, ex);
}
}
}
接收者,
public class Receiver {
private String url = "failover://(tcp://0.0.0.0:61616)?randomize=false";
private ActiveMQConnectionFactory connectionFactory;
private Connection connection;
private Session session = null;
private Topic destination = null;
private TopicSubscriber consumer;
private IdGenerator clientIdGenerator = new IdGenerator();
public Receiver(String topicId, MessageListener listener) {
try {
connectionFactory = new ActiveMQConnectionFactory(url);
connectionFactory.setDispatchAsync(false);
connection = connectionFactory.createTopicConnection();
connection.setClientID(clientIdGenerator.generateId());
connection.start();
session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
destination = session.createTopic(topicId);
consumer = session.createDurableSubscriber(destination, "subscriber");
consumer.setMessageListener(listener);
} catch (JMSException ex) {
Logger.getLogger(Receiver.class.getName()).log(Level.SEVERE, null, ex);
}
}
public void close() {
try {
consumer.close();
session.close();
connection.close();
} catch (JMSException ex) {
onException(ex);
Logger.getLogger(Receiver.class.getName()).log(Level.SEVERE, null, ex);
}
}
}