1

我写了一个连续 JMS 消息接收器:在这里,我使用 CLIENT_ACKNOWLEDGE 因为我不希望这个线程确认消息。

(...)
connection.start();
session = connection.createQueueSession(true, Session.CLIENT_ACKNOWLEDGE);
queue = session.createQueue(QueueId);
receiver = session.createReceiver(queue);
While (true) {
  message = receiver.receive(1000);
  if ( message != null ) {
    // NB : I can only pass Strings to the other thread
    sendMessageToOtherThread( message.getText() , message.getJMSMessageID() ); 
  }
  // TODO Implement criteria to exit the loop here
}

在另一个线程中,我将执行以下操作(成功处理后):
这是在一个不同的 JMS 连接中同时执行的。

public void AcknowledgeMessage(String messageId) {
  if (this.first) {
    this.connection.start();
    this.session = this.connection.createQueueSession( false, Session.AUTO_ACKNOWLEDGE );
    this.queue = this.session.createQueue(this.QueueId);
  }
  QueueReceiver receiver = this.session.createReceiver(this.queue, "JMSMessageID='" + messageId + "'");
  Message AckMessage = receiver.receive(2000);
  receiver.close();
}

似乎找不到消息(超时后 AckMessage 为空),而它确实存在于队列中。我怀疑消息被连续输入线程阻塞了。确实,当单独触发 AcknowledgeMes​​sage() 时,它工作正常。

有没有更简洁的方法来检索 1 条消息?基于它的 QueueId 和 messageId另外,如果连续阅读器必须长时间记住消息或 ID
,我觉得可能存在内存泄漏的风险.. 合理吗?

如果我使用 aQueueBrowser来避免影响确认线程,看起来我不能有这个连续的输入提要.. 对吗?

更多上下文:我正在使用ActiveMQ,这 2 个线程是 Pentaho Kettle 转换的 2 个自定义“步骤”
注意:代码示例被简化以专注于该问题。

4

1 回答 1

2

好吧,您不能两次阅读该消息,因为您已经在第一个线程中阅读了它。

ActiveMQ 不会删除该消息,因为您尚未确认它,但在您断开 JMS 连接之前它不会可见(我不确定 ActiveMQ 中是否也存在长时间超时)。

因此,您将不得不使用原始消息并执行以下操作:message.acknowledge();。但是请注意,会话不是线程安全的,因此如果您在两个不同的线程中执行此操作,请小心。

于 2012-12-06T12:23:41.367 回答