9

ActiveMQ用来发送消息。

所以当我发送消息时,消息来接收消息。在成功插入时,它被确认。

但是我有确认后的代码,可以抛出NullPointerException.

所以为了故意产生那个异常,我抛出了NullPointerException. 所以当它这样做时:

消息不是dequeued,同样的消息再次出现在onMessage函数中。

我的代码是:

public void onMessage(Message message) {
    String msg = null;
    try
    {
        msg = receiveMessage(message);

        // Other code to insert message in db

        message.acknowledge();

        if(true)
        {
            throw new NullPointerException("npe"));
        }
            ** // Other code which might produce a null pointer exception **
        }
        catch(Exception ex)
        {
        }
    }

为什么该消息再次onMessage()像我一样发挥作用acknowledge()

因为我已经在 db.xml 中插入了消息。

队列中的消息不会被删除acknowledge()吗?

我怎样才能做到这一点?

4

4 回答 4

4

您对消息侦听器使用 AUTO 确认模式,然后根据规范,如果消息侦听器未能成功返回(例如,如果抛出异常),则会重新传递消息。

在您的情况下,您正在尝试手动确认消息,但使用创建的会话是不可能的createSession(false, Session.AUTO_ACKNOWLEDGE)

您的代码可以与Session.CLIENT_ACKNOWLEDGE.

否则,您希望在使用 AUTO_ACKNOWLEDGE 时捕获 onMessage 方法中的异常。

要对您的消息进行更细粒度的控制,请考虑使用事务处理会话并用于session.commit();确认消息已被阅读。

于 2012-09-14T20:50:40.847 回答
3

您是否检查过您没有使用事务处理会话?当使用事务会话时,确认模式被忽略,所以:

  • message.acknowledge()将实际上是一个无操作

  • 在转义消息侦听器时,您未捕获的异常将触发“会话回滚”,从而强制重新传递消息。

注意:您发布的代码有一个catch (Exception ex) { },所以我不知道您的异常是如何逃到外面的。

于 2012-09-18T21:29:20.093 回答
2

您可以创建一个单独的方法来处理消息,我的意思是在onMessage()函数中编写仅将该消息插入数据库的代码。

并创建一个单独的函数来处理该消息。

这样,如果您在处理过程中遇到任何错误,该消息将不会onMessage()再次出现。

于 2012-09-24T15:08:52.053 回答
0

当您使用事务处理 JMS 确认模式时,您的消息将被 JMS-listener 多次接收(在 AMQ 中默认约为 8 次),直到被无异常处理或被 JMS-container 移动到 DQL-queue。有关详细信息,请参阅消息重新传递和 DLQ 处理

管理事务取决于您使用的框架。我更喜欢使用Spring Framework,所以我的 Spring XML 配置如下所示:

<jms:listener-container container-type="default"
                        connection-factory="calendarConnectionFactory"
                        acknowledge="transacted"
                        destination-type="queue"
                        cache="consumer"
                        concurrency="1-5">
    <jms:listener destination="${jms.calendar.destination}" ref="calendarListener"/>
</jms:listener-container>

我的消息监听器的 Java 代码是

@Override
@Transactional(propagation = Propagation.REQUIRED,
               noRollbackFor =
                 {ClassCastException.class, IllegalArgumentException.class})
public void onMessage(Message message) {
 ....
}

所以我可以管理哪些异常会回滚事务。

于 2012-09-14T15:51:55.173 回答