2

我正在使用 Apache Camel 2.13.1 来轮询一个数据库表,其中包含超过 300k 行。我正在寻找使用幂等消费者 EIP来过滤已处理的行。

不过,我想知道该实现是否真的可扩展。我的骆驼背景是: -

<camelContext xmlns="http://camel.apache.org/schema/spring">
        <route id="main">
        <from
            uri="sql:select * from transactions?dataSource=myDataSource&amp;consumer.delay=10000&amp;consumer.useIterator=true" />
        <transacted ref="PROPAGATION_REQUIRED" />
        <enrich uri="direct:invokeIdempotentTransactions" />                
        <!-- Any processors here will be executed on all messages -->
    </route>

    <route id="idempotentTransactions">
        <from uri="direct:invokeIdempotentTransactions" />
        <idempotentConsumer
            messageIdRepositoryRef="jdbcIdempotentRepository">
            <ognl>#{request.body.ID}</ognl>
            <!-- Anything here will only be executed for non-duplicates -->
            <log message="non-duplicate" />
            <to uri="stream:out" />
        </idempotentConsumer>
    </route>            
</camelContext>

似乎每 10 秒(通过 consumer.delay 参数)处理完整的 300k 行,这似乎非常低效。我希望某种反馈循环作为模式的一部分,以便提供过滤器的查询可以利用已处理的行集。

但是,CAMEL_MESSAGEPROCESSED 表中的 messageid 列的模式为

 {1908988=null}

其中 1908988 是 request.body.ID 我已将 EIP 设置为 key on,因此这不容易合并到我的查询中。

有没有更好的方法使用 CAMEL_MESSAGEPROCESSED 表作为我的 select 语句的反馈循环,以便 SQL 服务器执行大部分负载?

更新:

所以,我后来发现是我的 ognl 代码导致了奇怪的消息 id 列值。将其更改为

<el>${in.body.ID}</el>

已修复它。所以,既然我有一个可用的 messageId 列,我现在可以将我的“来自”SQL 查询更改为

select * from transactions tr where tr.ID IN (select cmp.messageid from CAMEL_MESSAGEPROCESSED cmp where cmp.processor = 'transactionProcessor')

但我仍然认为我正在破坏 Idempotent Consumer EIP。

还有其他人这样做吗?有什么理由不这样做?

4

1 回答 1

1

是的。但是您需要使用可扩展的存储来保存已处理的消息集。您可以使用 Hazelcast - http://camel.apache.org/hazelcast-idempotent-repository-tutorial.html或 Infinispan - http://java.dzone.com/articles/clustered-idempotent-consumer - 取决于哪个解决方案已经在您的堆栈中。当然,JDBC 存储库也可以工作,但前提是它满足所选的性能标准。

于 2014-10-01T14:43:45.187 回答