0

我正在尝试为我的 JMS 消息创建一个临时消费者(我使用 ActiveMQ)

它看起来像这样:

 jmsTemplate.browse(
                          q.getName(),
                          new BrowserCallback<Integer>() {

                          @Override
                          public Integer doInJms(Session session, QueueBrowser browser) throws JMSException {

                              Queue destination = session.createQueue(q.getName());
                              Enumeration<?> enum1 = browser.getEnumeration();

                              while (enum1.hasMoreElements()) {
                              ActiveMQObjectMessage msg = (ActiveMQObjectMessage) enum1.nextElement();
                              MessageConsumer consumer = session.createConsumer(destination);
                              Message m = consumer.receiveNoWait();
                              handle(m);
                              m.acknowledge();

这个临时消费者应该处理所有失败的消费消息。问题是我在 spring-messaging.xml 中定义的原始 2-3 个消费者不断尝试处理失败的事件并通过重新传递配置重试(重新传递延迟设置为 3 秒,并且重新传递的数量是无限的)

这个消费者应该处理这些消息,但实际上正在挨饿(根本不会收到这些消息,因此

          Message m = consumer.receiveNoWait();

一直返回null。

这是我的豆子:

    <bean id="redeliveryPolicy" class="org.apache.activemq.RedeliveryPolicy">
    <property name="queue" value="*" />
    <property name="initialRedeliveryDelay" value="0" />
    <property name="redeliveryDelay" value="2000" />        
    <property name="maximumRedeliveries" value="-1" />
</bean>

    <!-- A JmsTemplate instance that uses the cached connection and destination -->
<bean id="redeliveryJmsTemplate" class="org.springframework.jms.core.JmsTemplate">
    <property name="connectionFactory" ref="redeliveryCachingConnectionFactory" />
    <property name="messageConverter" ref="eventConverter" />
    <property name="sessionTransacted" value="true" />
</bean>

ps 当我将 p:sessionCacheSize 的配置更改为 1 时:

    <bean id="redeliveryCachingConnectionFactory"
    class="org.springframework.jms.connection.CachingConnectionFactory"
    p:targetConnectionFactory-ref="redeliveryConnectionFactory"
    p:sessionCacheSize="1" />

它有效,但我想使用缓存。

有任何想法吗?

4

1 回答 1

0

您假设队列浏览器正在返回可消耗的消息。浏览的消息就像浏览完成那一刻的快照。它将包括分配给预取的任何消息,或者当您再次访问它时,它可能已被使用。此外,您不会等待分配消息,因此您会不断地排在队列后面以获取消息。

我建议允许将消息移动到 DLQ,然后允许该 Adhoc 消费者从中消费。否则,您将与其他消费者竞争消息。

如果您真的坚决反对使用 DLQ,您可以使用JMS 选择器根据JMSRedelivered标头为真进行消费,等待时间更长。您仍将与其他消费者竞争,但会在某个时候收到消息。

于 2014-12-09T16:53:18.950 回答