1

我有 mule 流,我使用 JMS 轮询每 5 分钟轮询一次队列并处理队列中的所有消息。每当实体有一些更新时,我都会在队列中收到消息。每当我看到消息时,我都会从消息中提取唯一 ID,并使用它通过 Web 服务调用检索更新的实体。实体可以多次更新,所以我需要捕获所有更新。例如,如果队列中有 10 条消息引用同一个实体更新,那么我需要忽略 9 条消息并只使用一条,因为所有消息都引用同一个实体。我尝试为此使用幂等消息过滤器。但它第一次运行良好。当应用程序在 5 分钟后第二次从队列中读取消息(轮询频率)并且有同一实体的消息(更新)时,幂等消息过滤器会忽略这些消息。理想情况下,应用程序应该在将来以及在不同的轮询间隔期间使用相同的消息(如果有的话)。只有在一个时间间隔内发现重复消息时,它才应该忽略这些消息。但是如果在未来的时间间隔内发现,那么它应该重复它在第一个时间间隔内执行的相同处理(忽略重复并只处理一个)。

我希望我能够解释我的问题。

任何帮助将不胜感激。

谢谢,维杰

4

4 回答 4

4

尝试设置entryTTL过滤器对象存储,如:

<idempotent-message-filter idExpression="#[yourExpression]">
  <in-memory-store entryTTL="300000"/>
</idempotent-message-filter>

这样,用于过滤消息的条目将在五分钟后过期

高温高压

于 2013-05-01T21:33:18.560 回答
1

idempotent-message-filter是您想要使用的,但您需要使用自定义配置它idExpression

<idempotent-message-filter idExpression="#[ ... ]" />

这个表达式的文档说:

定义从消息中提取 ID 时要使用的一个或多个表达式。例如,可以将标头组合为消息的 ID 以提供幂等性:'#[headers:foo,bar]'。或者,您可以将消息 ID 与标头组合:'#[message:id]-#[header:foo]'。如果未设置此属性,则默认使用“#[message:id]”。

然后的想法是制作一个表达式,该表达式的组件对于一次轮询(可能是系统时间的模数)相同,并且对于在一次轮询中被认为相同的消息具有相同的组件。

于 2013-05-01T21:37:45.270 回答
0

谢谢大卫/丹尼尔。我使用它如下:

idempotent-message-filter idExpression="XPATH 表达式检索实体 id" > in-memory-store entryTTL="60000"/> /idempotent-message-filter>

它对我来说工作正常。

感谢你们在这方面的帮助。

维杰

于 2013-05-02T14:20:00.127 回答
0
<idempotent-message-filter idExpression="#[message:payload]" doc:name="Idempotent Message" throwOnUnaccepted="true" onUnaccepted="ValidationFailFlow">
<in-memory-store name="myInMemoryObjectStore" entryTTL="120" expirationInterval="3600" maxEntries="60000" ></inmemory-store>
</idempotent-message-filter>
于 2017-01-09T09:57:57.567 回答