1

我发现带有选项 transacted=true 的 JMS 组件不适用于 Aggregator。我的路由在到达聚合器之前是事务性的,在聚合器之后我的路由不再是事务性的。如果我理解得很好,这是因为聚合器正在为每个交换转换启动新线程,并且这种行为会停止第一个事务线程,从而导致事务提交和确认发送到队列。所以消息不再在队列中。这个对吗?

我需要对 Camel 做的是从队列(RabbitMQ)中读取所有消息,然后将它们全部转换到一个列表中,最后将数据从列表写入文件。所有这些都必须是事务性的,如果路由中发生错误,消息应该保留在队列中。基本上我到目前为止所做的是从队列中读取 jms 组件(使用选项 transacted=true),然后使用聚合器聚合所有消息,然后发送到文件。

我的代码的缩短示例:

from("jms:queue:someQUeue?transacted=true")
    // some more processing in the route                          <- route is transactional here
    .aggregate((constant(true)), new MyAggregationStrategy())
    .completionInterval(500)
    .process(new Processor() {                                    <- route is not transactional anymore
               // some processing
    })
    .toD("file://C/tmp...")

我认为最完美的组件是Simple JMS-batch,但我们使用的是Camel 3.8,这个组件在这个版本和更新的版本中不再存在。为什么?有什么新东西代替这个吗?

SJMSSJMS2也对我没有帮助,当我使用它们时,消费者确实会读取所有消息(队列中的所有消息都未确认 - 等待确认),但通过路由的处理仍然是一条一条消息。我怎样才能使它成为与所有消息列表的一次交换?

是否有其他解决方案?(例如,制作一些自定义聚合器 - 但如何?)

4

1 回答 1

0

据我所知,这是正确的!如果您的意图是实现保证交付,在这种情况下,您最好使用持久聚合存储库

于 2021-06-11T03:11:08.183 回答