0

我正在使用 JMS 在 Java 1.8 SE 环境中处理消息。消息源自 Oracle 高级队列。因为处理一条消息可能需要一段时间,所以我决定有一个由 5 个工作线程(MessageHandler 对象)组成的池,这样就可以一次处理多个线程。我想保证没有重复的消息传递。

我用

queueConnection.createQueueSession(true, Session.SESSION_TRANSACTED);

创建 QueueSession。我使用下面的代码来处理传入的消息。基本上,onMessage产生一个处理消息的线程。

public class JmsQueueListener implements MessageListener
{
    /** A pool of worker threads for handling requests. */
    private final ExecutorService pool;

    OracleJmsQueue queue;

    public void onMessage(Message msg)
    {
        pool.execute(new MessageHandler(msg));
        // can't commit here - the thread may still be processing
    }

    /**
     * This class provides a "worker thread" for processing a message
     * from the queue.
     */
    private class MessageHandler implements Runnable {

        /**
         * The message to process
         */
        Message message;

        /**
         * The constructor stores the passed in message as a field
         */
        MessageHandler(Message message) {
            this.message = message;
        }

        /**
         * Processes the message provided to the constructor by
         * calling the appropriate business logic.
         */
        public void run() {
            QueueSession queueSession = queue.getQueueSession();
            try {
                String result = requestManager.processMessage(message);

                if (result != null) {
                    queueSession.commit();
                }
                else {
                    queueSession.rollback();
                }
            }
            catch (Exception ex) {
                try {
                    queueSession.rollback();
                }
                catch (JMSException e) {
                }
            }
        }
    }   //  class MessageHandler

我的问题是我不知道如何向原始队列指示处理是否已成功完成。我无法在 结束时提交onMessage,因为线程可能尚未完成处理。我也不认为我目前拥有commits 和rollbacks 的地方有什么好处。例如,如果 5 个工作线程处于不同的完成状态,那么正在提交的队列会话的状态是什么?

我想我一定错过了一些关于如何以多线程方式处理 JMS 的基本概念。任何帮助将非常感激。

4

1 回答 1

1

您正在使用异步消息处理,因此,除非您实施一种确保每个消息处理按时间顺序完成的方法,否则您最终会遇到旧消息处理在最近的消息处理之后完成的情况。那么为什么要使用消息服务呢?

您的问题的一个简单解决方案是commitonMessage方法结束时,并在您的正文中messageHandler重新排队消息以防出现错误。但是,如果重新入队本身失败,则此解决方案可能会出现问题。

于 2018-02-27T01:21:14.853 回答