9

我正在尝试使用 Spring JMSTemplate.receive(String) 方法以同步模式从队列中获取所有消息。

问题是我总是只收到一条消息。这是代码:

@Transactional
public List<Message> receiveAllFromQueue(String destination) {
  List<Message> messages = new ArrayList<Message>();
  Message message;
  while ((message = queueJmsTemplate.receive(destination)) != null) {
    messages.add(message);
  }
  return messages;
}

如果我删除 @Transactional 注释,我会收到所有消息,但所有消息都是在事务之外完成的,因此如果稍后在处理这些消息期间出现异常,消息将丢失。

这是我的 JMSTemplate bean 的定义。

<bean id="queueJmsTemplate" class="org.springframework.jms.core.JmsTemplate">
    <property name="connectionFactory" ref="connectionFactory" />
    <property name="pubSubDomain" value="false" />
    <property name="receiveTimeout" value="1" />
   <property name="sessionTransacted" value="true" />
</bean>

我想要实现的是有一个事务,并且在这个事务中我想获取所有待处理的消息。

4

2 回答 2

7

JmsTemplate 的 receive 方法每次都会创建一个新的 MessageConsumer。第二次,您的事务尚未提交,Spring 将在第一次接收期间预取一些消息。那时没有消息要提取,导致您的接听电话为空。

Spring 中的 JmsTemplate 有一个以 SessionCallback 作为参数的执行方法。这允许您针对 JmsTemplate 的底层会话运行自己的代码。只创建一个 MessageConsumer 应该可以解决您的问题。

@Transactional
public List<Message> receiveAllFromQueue(String destination) {
    return jmsTemplate.execute(session -> {
        try (final MessageConsumer consumer = session.createConsumer(session.createQueue(destination))) {
            List<Message> messages = new ArrayList<>();
            Message message;
            while ((message = consumer.receiveNoWait()) != null) {
                messages.add(message);
            }
            return messages;
        }
    }, true);
}
于 2018-09-27T13:56:17.487 回答
5

我会回复自己。看起来 JMSTemplate 不支持它。暂时解决它的唯一方法是扩展 JMSTemplate 并添加使用部分 JMSTemplate 的新方法。不幸的是,有些方法是私有的,所以它们需要被复制......

public class CustomQueueJmsTemplate extends JmsTemplateDelegate {

  public List<Message> receiveAll(String destinationName) {
    return receiveAll(destinationName, null);
  }

  public List<Message> receiveAll(final String destinationName, final String messageSelector) {
    return execute(new SessionCallback<List<Message>>() {
      @Override
      public List<Message> doInJms(Session session) throws JMSException {
        Destination destination = resolveDestinationName(session, destinationName);
        return doReceiveAll(session, destination, messageSelector);
      }
    }, true);
  }

  private List<Message> doReceiveAll(Session session, Destination destination, String messageSelector)
      throws JMSException
  {
    return doReceiveAll(session, createConsumer(session, destination, messageSelector));
  }

  private List<Message> doReceiveAll(Session session, MessageConsumer consumer) throws JMSException {
    try {
      // Use transaction timeout (if available).
      long timeout = getReceiveTimeout();
      JmsResourceHolder resourceHolder = (JmsResourceHolder) TransactionSynchronizationManager
          .getResource(getConnectionFactory());
      if (resourceHolder != null && resourceHolder.hasTimeout()) {
        timeout = resourceHolder.getTimeToLiveInMillis();
      }

      // START OF MODIFIED CODE
      List<Message> messages = new ArrayList<>();
      Message message;
      while ((message = doReceive(consumer, timeout)) != null) {
        messages.add(message);
      }
      // END OF MODIFIED CODE

      if (session.getTransacted()) {
        // Commit necessary - but avoid commit call within a JTA transaction.
        if (isSessionLocallyTransacted(session)) {
          // Transacted session created by this template -> commit.
          JmsUtils.commitIfNecessary(session);
        }
      } else if (isClientAcknowledge(session)) {
        // Manually acknowledge message, if any.
        for (Message retrievedMessages : messages) {
          retrievedMessages.acknowledge();
        }
      }
      return messages;
    }
    finally {
      JmsUtils.closeMessageConsumer(consumer);
    }
  }

  private Message doReceive(MessageConsumer consumer, long timeout) throws JMSException {
    if (timeout == RECEIVE_TIMEOUT_NO_WAIT) {
      return consumer.receiveNoWait();
    } else if (timeout > 0) {
      return consumer.receive(timeout);
    } else {
      return consumer.receive();
    }
  }

}
于 2014-03-26T13:44:58.080 回答