1

我有一个要求,我目前不知道它是否可能。如果消息包含指定的属性,我想暂时禁用 JMS 消息的开发。目前我使用 HornetQ 作为消息提供者。

让我们举个例子:

该队列包含以下三个条目:

{1, "foo", "A_CATEGORY"}
{2, "bar", "B_CATEGORY"}
{9, "bof", "A_CATEGORY"}

在某个时刻,应用程序必须能够告诉 HornetQ 消息服务器,此时不应传递属于 B_CATEGORY 的消息(例如,因为 B_CATEGORY 对象的底层数据库已更新)。因此,id 为 2 的消息目前不会被传递,而 1 和 9 将被传递,因为它们对于类别对象具有不同的值。

它必须发生在 Java 代码之外,根本不需要重新启动应用程序。这可能吗?

谢谢你的帮助!


刚刚想到了解决这个问题的另一种设计方法。让我们假设第一个 Queue 包含具有各种类别的消息(顺便说一句,不可能为每个类别创建一个队列,因为它们可能很多)。这个“正常”队列是正常配置的(例如,没有过期,但是 DLQ)。

现在,如果侦听器使用这样的消息并发现它无法处理属于某个类别的消息,它会将其放入第二个队列。此队列配置了重新传递延迟和到期时间。如果现在将到期时间设置得足够高(当然不是队列溢出)并且重新传递时间不太短,那么如果上述问题没有解决方案,这应该可以解决。

当然,必须计算在无法处理类别期间可以创建多少这些队列条目。还有一个类别的这种不可用可能需要多长时间,以便可以相应地调整重新交付。

4

2 回答 2

1

据我所知,消息驱动的 bean 是不可能的。

使用标准 JMS 使用者可以实现类似的功能:

 MessageConsumer c = session.createConsumer(destination);
 while ( b-category-can-be-processed ) {
     Message m = c.receive();
     // process messages until b category is OK to be processed
 }

 c.close();

 // now create a different consumer with message selector ignoring "B_CATEGORY"
 MessageConsumer c1 = session.createConsumer(destination, "Category <> 'B_CATEGORY'");
 while ( b-is-locked ) {
     Message m = c1.receive();
     // process messages until b category is locked
 }

 c1.close();
 // go to start

此示例假设您能够根据收到的消息判断何时再次处理 B。如果没有,那么您可以在一定时间后恢复正常的例行程序。该示例还仅展示了一个执行线程。

进一步探索这条路径,您可以看看 Spring 的DefaultMessageListenerContainer— Spring 消息驱动 bean。它可以完全按照我的描述进行,但方式要先进得多。它可以提供消息选择器,并且它是实时的,您可以随时更改它。如果您设置concurrentConsumers大于 1,它也会在多个线程中处理消息。

至于您在无法处理消息时将消息重定向到另一个队列的解决方案,请注意它会产生额外的流量;您确实希望最终处理所有消息,对吗?为什么不将它们留在原处并在适当的时候取回它们呢?您不必估计未来的重新交付延迟,这可能很难。

于 2011-10-09T20:47:58.823 回答
0

您可以使用过滤器创建核心队列(或订阅)并使用管理 API 停止队列。或者,如果您正在嵌入式工作,您可能会导致服务器队列对象暂停。

由于这将是一个非常自定义的功能,您可以将其嵌入使用,或者在您自己的分支中进行特殊调整。

于 2011-10-10T14:19:29.613 回答