我正在使用 Apache Camel 2.13.1 来轮询一个数据库表,其中包含超过 300k 行。我正在寻找使用幂等消费者 EIP来过滤已处理的行。
不过,我想知道该实现是否真的可扩展。我的骆驼背景是: -
<camelContext xmlns="http://camel.apache.org/schema/spring">
<route id="main">
<from
uri="sql:select * from transactions?dataSource=myDataSource&consumer.delay=10000&consumer.useIterator=true" />
<transacted ref="PROPAGATION_REQUIRED" />
<enrich uri="direct:invokeIdempotentTransactions" />
<!-- Any processors here will be executed on all messages -->
</route>
<route id="idempotentTransactions">
<from uri="direct:invokeIdempotentTransactions" />
<idempotentConsumer
messageIdRepositoryRef="jdbcIdempotentRepository">
<ognl>#{request.body.ID}</ognl>
<!-- Anything here will only be executed for non-duplicates -->
<log message="non-duplicate" />
<to uri="stream:out" />
</idempotentConsumer>
</route>
</camelContext>
似乎每 10 秒(通过 consumer.delay 参数)处理完整的 300k 行,这似乎非常低效。我希望某种反馈循环作为模式的一部分,以便提供过滤器的查询可以利用已处理的行集。
但是,CAMEL_MESSAGEPROCESSED 表中的 messageid 列的模式为
{1908988=null}
其中 1908988 是 request.body.ID 我已将 EIP 设置为 key on,因此这不容易合并到我的查询中。
有没有更好的方法使用 CAMEL_MESSAGEPROCESSED 表作为我的 select 语句的反馈循环,以便 SQL 服务器执行大部分负载?
更新:
所以,我后来发现是我的 ognl 代码导致了奇怪的消息 id 列值。将其更改为
<el>${in.body.ID}</el>
已修复它。所以,既然我有一个可用的 messageId 列,我现在可以将我的“来自”SQL 查询更改为
select * from transactions tr where tr.ID IN (select cmp.messageid from CAMEL_MESSAGEPROCESSED cmp where cmp.processor = 'transactionProcessor')
但我仍然认为我正在破坏 Idempotent Consumer EIP。
还有其他人这样做吗?有什么理由不这样做?