3

要求:我希望消息在队列中持续存在直到onMessage()成功执行。如果在执行期间发生任何异常onMessage()并且如果它未被处理,则应将消息重新传递给侦听器。

我将 Glassfish v2 作为应用程序服务器。我正在使用 OpenMQConnectionFactory 和 JmsTemplate 在队列上发送消息。 请注意,我没有使用 MDB。

<bean id="openMQConnectionFactory"
    class="com.is.br.util.OpenMqConnectionFactoryBean">
    <property name="imqAddressList" value="mq://localhost:7676" />
    <property name="imqDefaultUsername" value="admin" />
    <property name="imqDefaultPassword" value="admin" />
</bean>

我尝试将 AUTO_ACKNOWLEDGE 作为确认模式,但在未重新传递异常引发的消息时在侦听器中。

消息生产者.java

公共无效发送消息(最终字符串响应流){

    System.out.println("Enter into IsJmsProducer.sendMessage method");

    try {
        MessageCreator creator = new MessageCreator() {
            public Message createMessage(Session session) {
                ObjectMessage message = null;
                try {
                    message = session.createObjectMessage(responseStream);
                    } catch (Exception e) {
                    System.out.println("Unable create a JMSMessage");
                }
                return message;
            }
        };

        System.out.println("Sending message to destination: " + this.destination.toString());
        this.jmsTemplate.setSessionAcknowledgeMode(Session.AUTO_ACKNOWLEDGE);

        this.jmsTemplate.send(this.destination, creator);
        System.out.println("SendMessage to queue successfully.");           
    } catch (Exception ex) {
        System.out.println("SendMessage to queue Fail." + ex);
    }
    System.out.println("Exit from IsJmsProducer.sendMessage method");

}

SampleJMSConsumer.java

public class SampleJMSConsumer implements MessageListener {

    @Override
    public void onMessage(Message message) {
        throw new RuntimeException();
   }
}

然后我尝试this.jmsTemplate.setSessionAcknowledgeMode(Session.CLIENT_ACKNOWLEDGE);在我调用的侦听器中使用并在我调用 message.acknowledge();catch块中调用session.recover()仍然消息未重新传递。

SampleJMSConsumer.java

public class SampleJMSConsumer implements MessageListener  {

    @Override
    public void onMessage(Message message) {

        ObjectMessage objectMessage = (ObjectMessage) message;
        Object object;
        try {
            object = objectMessage.getObject();
            if (object instanceof String) {
                System.out.println("Message received - " + object.toString());
                throw new JMSException("JMS exception");
            }
            message.acknowledge();
        } catch (JMSException e) {
               session.recover();
        }
    }

}

当我在调试模式下运行程序并在代理管理控制台中的队列上发送消息时,我能够看到消息的数量,但只要调用 onMessage() 的消息数量减少一。这意味着消息被消费并从队列中删除。该消息是否被视为“已送达”?请帮助我理解为什么发生异常时消息没有重新传递?

提前致谢。

4

4 回答 4

1

我认为这是设计使然,在调用 onmessage 时交付。如果你想对异常做点什么,你可以使用 try catch 来处理它。

假设消息再次被放入队列,无论如何在消费时您可能会遇到相同的异常。

ack 机制应该是关于确保正确交付的。也许你所追求的是一个拒绝机制,你要求 prpoducerside 发送一条新消息?

于 2012-07-16T06:42:44.713 回答
1

客户确认适合您。在您的 onMessage() 方法中,一旦处理结束,您需要调用 Acknowledge,否则如果有任何异常,则不要调用 Acknowledge()。

Session.Recovery() 停止并重新启动消息传递。消息的传递将从最后一个未确认的消息开始。

于 2012-07-16T07:23:55.287 回答
0

在消费者中创建的会话应将会话模式设置为 AUTO_ACK / DUPS_OK_ACK。您尚未共享启动消费者的代码。您在生产者而不是消费者中设置会话模式。

于 2014-08-20T13:14:09.947 回答
0

我建议验证 OpenMQ 的默认会话模式是什么。可能发生的情况是,一旦打开连接,就无法更改它,因此必须在打开连接时指定。

于 2012-07-16T07:06:55.247 回答