我是 jms 的新手。目标是在异步侦听器的 onMessage 方法中同时处理来自队列的消息,方法是将侦听器实例附加到多个使用者,每个使用者使用自己的会话并在单独的线程中运行,这样消息就会传递给不同的使用者并发处理。
1)是否可以通过创建多个消费者同时处理来自单个队列的消息?2) 我想出了下面的代码,但想了解一下下面的代码是否适合我想要完成的任务。
public class QueueConsumer implements Runnable, MessageListener {
public static void main(String[] args) {
QueueConsumer consumer1 = new QueueConsumer();
QueueConsumer consumer2 = new QueueConsumer();
try {
consumer1.init("oms", "US.Q.CHECKOUT-ORDER.1.0.JSON");
consumer2.init("oms","US.Q.CHECKOUT-ORDER.1.0.JSON");
} catch (JMSException ex) {
ex.printStackTrace();
System.exit(-1);
}
Thread newThread1 = new Thread(consumer1);
Thread newThread2 = new Thread(consumer1);
newThread1.start();
newThread2.start();
}
private static String connectionFactoryName = null;
private static String queueName = null;
private static ConnectionFactory qcf = null;
private static Connection queueConnection = null;
private Session ses = null;
private Destination queue = null;
private MessageConsumer msgConsumer = null;
public static final Logger logger = LoggerFactory
.getLogger(QueueConsumer.class);
public QueueConsumer() {
super();
}
public void onMessage(Message msg) {
if (msg instanceof TextMessage) {
try {
//process message
} catch (JMSException ex) {
ex.printStackTrace();
}
}
}
public void run() {
try {
queueConnection.start();
} catch (JMSException e) {
e.printStackTrace();
System.exit(-1);
}
while (!Thread.currentThread().isInterrupted()) {
synchronized (this) {
try {
wait();
} catch (InterruptedException ex) {
break;
}
}
}
}
public void init(String factoryName, String queue2) throws JMSException {
try {
qcf = new JMSConnectionFactory(factoryName);
queueConnection = qcf.createConnection();
ses = queueConnection.createSession(false,
Session.CLIENT_ACKNOWLEDGE);
queue = ses.createQueue(queue2);
logger.info("Subscribing to destination: " + queue2);
msgConsumer = ses.createConsumer(queue);
msgConsumer.setMessageListener(this);
System.out.println("Listening on queue " + queue2);
} catch (Exception e) {
e.printStackTrace();
System.exit(-1);
}
}
private static void setConnectionFactoryName(String name) {
connectionFactoryName = name;
}
private static String getQueueName() {
return queueName;
}
private static void setQueueName(String name) {
queueName = name;
}
}