1

我正在解决处理具有不同优先级的 JMS 消息的问题。我使用 Apache Camel 框架作为 EIP 实现者。我有两个必须使用的队列。第一个包含优先级较高的消息,第二个包含优先级较低的消息。

现在我想创建一个包含循环的特殊组件“混合器”,执行以下步骤:

  1. 查看优先级队列
    • 如果它包含一条消息,它将把它发送到输出队列并进入一个循环的开始(如果它包含很多消息,它将处理前10条消息,然后转到步骤2)
    • 如果它不包含任何消息,请转到步骤 2
  2. 看第二个队列
    • 如果它包含一条消息,则将其发送到输出队列(但只有一条消息)并重复循环
    • 如果它不包含任何消息,则重复循环

如您所见,如果有很多消息(优先队列中的 10 条/第二条中的 1 条),我想保持一个特殊的比例。如果优先队列中没有消息,我们可以立即处理第二个队列中的消息。我想要像 EIP Resequencer 这样的东西,它将从多个输入队列中消耗。

我寻找了一条骆驼路线,我将从两个队列中消费,在那里我将拥有上面描述的组件“Mixer”。我想要这样的东西:

<route>
  <from id="A" />
  <from id="B" />
  <resequence id="mixer" />
  <to id="C" />
</route>

但我没有找到如何做到这一点的方法。一条路线只能有一个输入。如果它有更多的输入,Camel 将在内部复制路线(我们将有两条独立的路线)并且行为将如下:

<route>
  <from id="A" />
  <resequence id="mixer" />
  <to id="C" />
</route>

<route>
  <from id="B" />
  <resequence id="mixer" />
  <to id="C" />
</route>

如此处所述。这不是我想要的:-(。你有什么想法可以解决我的问题吗?提前谢谢!

4

1 回答 1

0

也许你可以看看Polling Consumer EIP。

我正在展示来自骆驼网站的一些示例代码,这可能是您解决方案的基础:

public void someBusinessLogic() {
    // loop to empty queue
    while (true) {
        // receive the message from the queue, wait at most 3 sec
        String msg = consumer.receiveBody("activemq:queue.inbox", 3000, String.class);
        if (msg == null) {
            // no more messages in queue
            break;
        }

        // do something with body
        msg = "Hello " + msg;

        // send it to the next queue
        producer.sendBodyAndHeader("activemq:queue.foo", msg, "number", count++);
    }
}

您可以尝试修改此代码以具有一个计数器,该计数器尝试从您的 hi-pri 队列中消耗 10 条消息,并从您的 lo-pri 队列中消耗 1 条消息。

玩得开心,祝你好运。有兴趣知道这是否成功:)

于 2013-12-06T15:20:33.830 回答