1

我有一个 Camel 路由,可以将消息从队列中出列,将其发送到 bean 进行处理,然后将消息重新排入另一个队列。

我正在尝试消除第二个队列上的“重复消息”。Camel 是否有任何端点、处理器、EIP 等我可以配置为在发送到第二个队列之前在途中对消息进行重复数据删除?

例子:

<route id="myRoute">
    <from uri="{{queue-1-uri}}" />
    <to uri="bean:myBean?method=process" />
    <!-- How to dedupe right here??? -->
    <to uri="{{queue-2-uri}}" />
</route>

更新:也许是这样的:

<route id="myRoute">
    <from uri="{{queue-1-uri}}" />
    <to uri="bean:myBean?method=process" />
    <filter>
        <method>what goes here???</method>
        <to uri="{{queue-2-uri}}" />
    </filter>
</route>

根据 Ralf 的建议,我可以在其中引用一个 bean <method></method>,然后使用缓存将消息保存在内存中。

假设这个新 bean 被调用FilterBean并且它有一个dedupe()方法:我如何在 Spring XML 中连接它,以及 bean 需要实现哪些类/接口才能从路由内部调用?

4

1 回答 1

3

我认为您正在寻找Idempotent Consumer骆驼提供的。根据您的需求,有不同的方法,例如 Memory, JDBC, Hazelcast... 我不确定它是否适合您,因为您使用的是beanafter the consumer,但值得一试。Camel网站的简单示例如下:

<!-- repository for the idempotent consumer -->
<bean id="myRepo" class="org.apache.camel.processor.idempotent.MemoryIdempotentRepository"/>

<camelContext xmlns="http://camel.apache.org/schema/spring">
    <route>
        <from uri="direct:start"/>
        <idempotentConsumer messageIdRepositoryRef="myRepo">
            <!-- use the messageId header as key for identifying duplicate messages -->
            <header>messageId</header>
            <!-- if not a duplicate send it to this mock endpoint -->
            <to uri="mock:result"/>
        </idempotentConsumer>
    </route>
</camelContext>

您可以在此处找到更多信息:幂等消费者

于 2014-02-13T22:18:15.327 回答