1

我有一个带有订单的数据库,每个订单都有一个截止日期和一个创建日期。我想将最多 4 个订单拉入路由并同时处理它们。每个订单可能需要 10-20 分钟来处理。但我想尽可能多地保持所有线程运行,没有任何停机时间。

这是我现在拥有的:

from("timer://GetOrder?fixedRate=true&period=1s")
            .to("bean:orderInfoDao?method=getNextOrder")
            .to("jms://process-orders")
            .end();

from("jms://process-orders?concurrentConsumers=4")
            .to("bean:orderService?method=processOrder(${body})")
            .to("direct:send-result")
            .end();

DAO 函数按创建日期返回最旧的订单,该getNextOrder订单已超过到期日。立即尝试传入订单。

现在,问题是由于计时器的原因,传入的订单在 JMS 路由中堆积,当getNextOrder返回一个更旧的订单时,它在队列中远远落后。

有什么想法可以构建这些路线,以便为最旧的 4 个订单轮询数据库并同时执行这些订单吗?对 DAO 的更改是可以接受的。

有没有一种多线程生产者?

提前感谢您的建议!

4

1 回答 1

5
final Semaphore semaphore = new Semaphore(4); 

from("timer://GetOrder?period=1s")
            .to("bean:orderInfoDao?method=getNextOrder")
            .to("jms://process-orders")
            .process(new Processor() {
                 public void process(Exchange exchange) {
                     semaphore.acquire();
                 }
             })
            .end();

from("jms://process-orders?concurrentConsumers=4")
            .to("bean:orderService?method=processOrder(${body})")
            .process(new Processor() {
                 public void process(Exchange exchange) {
                     semaphore.release();
                 }
             })
            .to("direct:send-result")
            .end();

请注意,计时器fixedRate已关闭(默认)。

这是我想到的第一个想法,我希望有一些 Camel EIP 可以帮助以更好的方式实现这个逻辑。

于 2015-06-11T22:36:24.060 回答