2

我有一个 spring 应用程序,我想使用JMS 消息组来处理特定块中的 JMS 消息(以及相同的事务等)。基本上说我有 5 个相关事件,我有一个JMSTemplate以相同JMSXGroupID和连续的JMSXGroupSeq值发送它们。

然后我MessageProcessorService在 Spring 中定义了一个看起来像这样的东西:

<bean id="messageProcessorService" class="x.y.z.MessageProcessorService"/>
<jms:listener-container connection-factory="pooledJmsConnectionFactory" concurrency="5" >
    <jms:listener destination="messages.queue" ref="messageProcessorService" />
</jms:listener-container>

MessageProcessorService的是标准的,简单的:

@Service
public class MessageProcessorService implements MessageListener {

public void onMessage(Message msg) { ... }
}

问题是,因为 onMessage 一次只能收到 1 条消息。如何获取特定组中的所有 5 条消息,然后开始处理它们?

我知道我可以使用负值JMSXGroupSeq来标记组的结束,然后我想我可以保留一小部分消息并检查消息JMSXGroupSeq,当它是 -1 时处理整个组,但这似乎有点hacky,不确定它是否是线程安全的(我肯定需要并行处理多个线程)。

以前在 Spring/JMS/ActiveMQ 中还有其他人做过类似的事情吗?

4

2 回答 2

1

为了回答我自己的问题,我找到了这种方法并做了大致相似的事情。但它并不漂亮。我认为从长远来看,我们可能不得不放弃 MessageListenerContainer 并使用直接 JMS api 推出我们自己的解决方案。

于 2013-07-12T18:55:35.547 回答
0

好问题。虽然消息组对于某些任务非常有用,但它们并没有将这种优势映射到消息侦听器上。

您可以采用您的方法,在消息中保留一个列表。如果您在原型范围内创建侦听器 bean,则线程安全不会成为问题。

<bean id="messageProcessorService" class="x.y.z.MessageProcessorService" scope="prototype"/>

http://static.springsource.org/spring/docs/3.0.7.RELEASE/reference/beans.html#beans-factory-scopes-prototype

您只需要保留一些在课堂上收到的消息的内部列表,它们对于该线程将是唯一的。

这在一定程度上取决于您的交易要求。您是否需要围绕整个消息组提交事务?然后,您需要跟踪何时提交和何时不提交。IBM在清单 5 中做了一个示例(虽然没有使用 ActiveMQ)。它不是 Spring,而是普通的MDB。不能说它与 ActiveMQ 的工作原理相同,所以我不保证。

于 2012-08-30T20:46:40.190 回答