1

我有一个主题,其中有多个使用默认预取运行的订阅客户端。如果其中一个客户端速度很慢,它会减慢其他订阅的客户端。我想动态降低慢速消费者的预取限制,但由于客户端随机变慢,这需要动态完成。

我想对以下解决方案进行原型设计:为每个订阅者创建队列。线程池将从主题中删除事件并将事件复制到我的队列中。现在,由于我为每个订阅者设置了队列,因此每个客户端都是相互独立的。我将为每个队列设置预取限制。一旦达到该限制,我将放弃这些事件。缺点:现在每个队列都需要内存。

我想对上述解决方案或您认为可能适合我的情况的任何其他解决方案提出一些看法。

我在下面为我的用例添加了更多详细信息:
listener1 处理速度:142 rps - listener2 处理速度:10 rps

事件产生速度 - 100 rps

默认预取限制:32000

情况 1:当两个侦听器的预取限制相等时。在 ~ 761 秒内 - 主题在开始删除事件之前已满。

案例 2:当慢消费者的预取限制小于快速消费者的预取限制 listener2 预取限制:64K 以上解决方案效果很好

并非有时听众 2 的处理速度会增加,而听众 1 的处理速度会降低(注意处理速度不会完全反转,但我使用的是极值)并且 case2 不起作用。现在 listener1: 10 rps listener2: 142 rps 在开始丢弃事件之前,主题需要 1523 秒才能充满。一旦它开始丢弃事件,侦听器 1 也将以与侦听器 2 相同的速度开始处理。

我正在寻找让每个听众独立运行而不阻碍其他听众的建议?

4

1 回答 1

1

您是否查看过 ActiveMQ 页面上处理慢速消费者的文档。基本上,该策略是使用挂起消息限制策略让代理开始为移动缓慢并导致备份的消费者丢弃旧消息。由于缓慢的消费者会导致在 Broker 上构建消息,因此您开始达到配置限制,从而导致 Broker 减慢生产者的速度。通过制定未决消息限制,您可以防止减慢速度的堆积。

您还可以关闭生产者流控制以允许生产者继续以正常速率运行,并让消息假脱机到磁盘,直到磁盘也达到其限制,这也在文档中进行了介绍。

于 2013-03-16T16:28:39.993 回答