我正在尝试为我的 JMS 消息创建一个临时消费者(我使用 ActiveMQ)
它看起来像这样:
jmsTemplate.browse(
q.getName(),
new BrowserCallback<Integer>() {
@Override
public Integer doInJms(Session session, QueueBrowser browser) throws JMSException {
Queue destination = session.createQueue(q.getName());
Enumeration<?> enum1 = browser.getEnumeration();
while (enum1.hasMoreElements()) {
ActiveMQObjectMessage msg = (ActiveMQObjectMessage) enum1.nextElement();
MessageConsumer consumer = session.createConsumer(destination);
Message m = consumer.receiveNoWait();
handle(m);
m.acknowledge();
这个临时消费者应该处理所有失败的消费消息。问题是我在 spring-messaging.xml 中定义的原始 2-3 个消费者不断尝试处理失败的事件并通过重新传递配置重试(重新传递延迟设置为 3 秒,并且重新传递的数量是无限的)
这个消费者应该处理这些消息,但实际上正在挨饿(根本不会收到这些消息,因此
Message m = consumer.receiveNoWait();
一直返回null。
这是我的豆子:
<bean id="redeliveryPolicy" class="org.apache.activemq.RedeliveryPolicy">
<property name="queue" value="*" />
<property name="initialRedeliveryDelay" value="0" />
<property name="redeliveryDelay" value="2000" />
<property name="maximumRedeliveries" value="-1" />
</bean>
<!-- A JmsTemplate instance that uses the cached connection and destination -->
<bean id="redeliveryJmsTemplate" class="org.springframework.jms.core.JmsTemplate">
<property name="connectionFactory" ref="redeliveryCachingConnectionFactory" />
<property name="messageConverter" ref="eventConverter" />
<property name="sessionTransacted" value="true" />
</bean>
ps 当我将 p:sessionCacheSize 的配置更改为 1 时:
<bean id="redeliveryCachingConnectionFactory"
class="org.springframework.jms.connection.CachingConnectionFactory"
p:targetConnectionFactory-ref="redeliveryConnectionFactory"
p:sessionCacheSize="1" />
它有效,但我想使用缓存。
有任何想法吗?