2

我有以下情况:

我将 100 条消息推送到 2 个消费者共享的队列中。两个订阅者都以预获取模式和显式模式订阅队列。处理完每条消息后,每个订阅者都会接受该消息以将其从队列中删除。伪代码看起来像这样:

OnMessageTransfer(message) :
    DoSomethingWithMessage(message)
    Session.MessageAccept(message)

消息负载均衡正确,每条消息只处理一次,但我们发现它没有考虑每个消费者的处理时间。例如,假设消费者 A 需要 50 毫秒来处理一条消息,而消费者 B 需要 5 秒。理想情况下,消费者 B 应该开始处理 1 条消息,与此同时,消费者 A 应该处理其他 99 条消息。但是,消费者 B 实际上会在 50 秒内处理 25 条消息,而消费者 A 将在大约 4 秒内处理其他 75 条消息,并且将处于空闲状态。客户端 api 似乎预取了消息,这在这种情况下显然不是最佳的。

我们如何解决这个问题?

我们使用的是 Qpid cpp 0.5 和完全托管的 c# 0-10 客户端 API,而不是 cpp 绑定(但我的理解是这种行为与 API 的实现无关)

问候,

朱利安

4

1 回答 1

2

可以通过为队列配置消息流来更改此行为。为避免预取消息,应将其设置为 1 条消息的信用并在处理完每条消息后更新。伪代码如下所示:

InitSubscription(queue) : 
    MessageSubscribe(queue, AcceptMode.Explicit, AcquireMode.PreAcquired)
    MessageSetFlowMode(queue, FlowMode.Credit)
    MessageFlow(queue, CreditUnit.Byte, MAX_BYTES)
    MessageFlow(queue, CreditUit.Message, 1) // will disable prefetch

OnMessageTransfer(message) :
    DoSomethingWithMessage(message)
    MessageAccept(message)
    MessageFlow(queue, CreditUit.Message, 1) // reissue a credit for 1 and only 1 message
于 2011-03-08T08:38:03.743 回答