2

我是 jms 的新手。目标是在异步侦听器的 onMessage 方法中同时处理来自队列的消息,方法是将侦听器实例附加到多个使用者,每个使用者使用自己的会话并在单独的线程中运行,这样消息就会传递给不同的使用者并发处理。

1)是否可以通过创建多个消费者同时处理来自单个队列的消息?2) 我想出了下面的代码,但想了解一下下面的代码是否适合我想要完成的任务。

public class QueueConsumer implements Runnable, MessageListener {

public static void main(String[] args) {




    QueueConsumer consumer1 = new QueueConsumer();
    QueueConsumer consumer2 = new QueueConsumer();
    try {
        consumer1.init("oms", "US.Q.CHECKOUT-ORDER.1.0.JSON");
        consumer2.init("oms","US.Q.CHECKOUT-ORDER.1.0.JSON");
    } catch (JMSException ex) {
        ex.printStackTrace();
        System.exit(-1);
    }


    Thread newThread1 = new Thread(consumer1);
    Thread newThread2 = new Thread(consumer1);
    newThread1.start();
    newThread2.start();



}


private static String connectionFactoryName = null;
private static String queueName = null;


private static ConnectionFactory qcf = null;
private static Connection queueConnection = null;


private Session ses = null;
private Destination queue = null;
private MessageConsumer msgConsumer = null;

public static final Logger logger = LoggerFactory
        .getLogger(QueueConsumer.class);

public QueueConsumer() {
    super();
}

public void onMessage(Message msg) {
    if (msg instanceof TextMessage) {
        try {

            //process message

        } catch (JMSException ex) {
            ex.printStackTrace();

        }
    }

}

public void run() {

    try {
        queueConnection.start();
    } catch (JMSException e) {

        e.printStackTrace();

        System.exit(-1);
    }
    while (!Thread.currentThread().isInterrupted()) {
        synchronized (this) {
            try {
                wait();
            } catch (InterruptedException ex) {
                break;
            }
        }
    }

}



public void init(String factoryName, String queue2) throws JMSException {
    try {

        qcf = new JMSConnectionFactory(factoryName);


        queueConnection = qcf.createConnection();


        ses = queueConnection.createSession(false,
                Session.CLIENT_ACKNOWLEDGE);
        queue = ses.createQueue(queue2);
        logger.info("Subscribing to destination: " + queue2);

        msgConsumer = ses.createConsumer(queue);


        msgConsumer.setMessageListener(this);

        System.out.println("Listening on queue " + queue2);

    } catch (Exception e) {
        e.printStackTrace();
        System.exit(-1);
    }

}

private static void setConnectionFactoryName(String name) {
    connectionFactoryName = name;
}

private static String getQueueName() {
    return queueName;
}

private static void setQueueName(String name) {
    queueName = name;
}

}

4

3 回答 3

2
  1. 是的,一点没错
  2. 我只看了一眼,我注意到你将错误的消费者传递给你的第二个线程:

    Thread newThread2 = new Thread(consumer1); // has to pass consumer2
    

    除此之外,一些变量,例如ConnectionFactory静态变量,多次初始化/覆盖。您只需要一个可以创建多个会话和/或消费者的连接。

于 2013-06-13T07:01:09.020 回答
0

与您提供的代码示例相关,Oracle 不建议在已部署的应用程序上创建低级线程。Weblogic 示例: 在 WebLogic Server 中使用线程

于 2015-04-08T15:50:01.347 回答
0

相反,在您制作邮件容器 bean 的 applicationcontext.xml 中,您可以添加并发使用者属性,这将是一种更好的方法。

<bean id="jmsMailContainer" class="org.springframework.jms.listener.DefaultMessageListenerContainer">
<property name="concurrentConsumers">
    <value>100</value>
</property>
<property name="connectionFactory" ref="connectionFactory"/>
<property name="destination" ref="mailDestination"/>
<property name="messageListener" ref="jmsMailConsumer"/>

于 2018-07-30T09:24:16.713 回答