8

We are facing a random issue with ActiveMQ and its consumers. We observe that, few consumers are not receiving messages, even though they are connected to ActiveMQ queue. But it works fine after the consumer restart.

We have a queue named testQueue at ActiveMQ side. A consumer is trying to de-queue the messages from this queue. We are using Spring's DefaultMessageListenerContainer for this purpose. Message is being delivered to the consumer node from ActiveMQ Broker. From the tcpdump as well, it was obvious that, message is reaching the consumer node, But the actual consumer code is not able to see the message. In other words, the message seems to be stuck either in ActiveMQ consumer code or in Spring’s DefaultMessageListenerContainer.

See refer to the below fig. for more clarity on the issue. Message is reaching Consumer node, but it is not reaching the “Actual Consumer Class”, which means that the message got stuck either in AMQ consumer code or Spring DMLC.

enter image description here

Below are the details captured from ActiveMQ admin.

Queue-Name /Pending-Message-Count /Consumer-Count /Messages-Enqueued /Messages-Dequeued testQueue /9 /1 /9 /0

Below are the more details.

Connection-ID /SessionId /Selector /Enqueues /Dequeues /Dispatched /Dispatched-Queue /Prefetch ID:bearsvir52-45176-1375519181268-3:5 /1 / /9 /0 /9 /9 /250

From the second table it is obvious that, messages are being delivered to the consumer, but the consumer is not acknowledging the message. Hence the messages are stuck in Dispatched-Queue at broker side.

Few points for to your notice:

1)There is no time difference b/w Broker node and consumer node.

2)Observed the tcpdump at consumer side. We can see MessageDispatch(Openwire) packet being transferred to consumer node, But could not find the MessageAck(Openwire) for the same.

3)Sometimes it is working on a node, and sometimes it is creating problem on the same node.

4

2 回答 2

2

花了很多时间来找出解决方案。org.apache.activemq.ActiveMQConnection.java类似乎存在一些问题,以防 AMQ 故障转移。在这种情况下,连接对象不会在消费者端启动。

以下是我在 ActiveMQConnection.java 文件中添加的修复程序并编译了源以创建activemq-core-xxxjar

private final Object startMutex = new Object();

在 createSession 方法中添加了一个检查

public Session createSession(boolean transacted, int acknowledgeMode) throws JMSException {
    synchronized (startMutex) {
        if(!isStarted()) {
            start();
        }
    }
于 2015-02-28T03:55:47.270 回答
2

造成这种情况的一个原因可能是错误地将CachingConnectionFactory(带有缓存的消费者)与动态调整消费者(最大消费者>消费者)的侦听器容器一起使用。您最终可能会得到一个缓存的消费者,它只是坐在池中而没有被积极使用。您永远不需要使用侦听器容器缓存消费者。

对于此类问题,我通常建议使用 TRACE 日志记录运行,您可以看到所有消费者活动。

于 2013-08-12T12:52:40.903 回答