我有一个主题,其中有多个使用默认预取运行的订阅客户端。如果其中一个客户端速度很慢,它会减慢其他订阅的客户端。我想动态降低慢速消费者的预取限制,但由于客户端随机变慢,这需要动态完成。
我想对以下解决方案进行原型设计:为每个订阅者创建队列。线程池将从主题中删除事件并将事件复制到我的队列中。现在,由于我为每个订阅者设置了队列,因此每个客户端都是相互独立的。我将为每个队列设置预取限制。一旦达到该限制,我将放弃这些事件。缺点:现在每个队列都需要内存。
我想对上述解决方案或您认为可能适合我的情况的任何其他解决方案提出一些看法。
我在下面为我的用例添加了更多详细信息:
listener1 处理速度:142 rps - listener2 处理速度:10 rps
事件产生速度 - 100 rps
默认预取限制:32000
情况 1:当两个侦听器的预取限制相等时。在 ~ 761 秒内 - 主题在开始删除事件之前已满。
案例 2:当慢消费者的预取限制小于快速消费者的预取限制 listener2 预取限制:64K 以上解决方案效果很好
并非有时听众 2 的处理速度会增加,而听众 1 的处理速度会降低(注意处理速度不会完全反转,但我使用的是极值)并且 case2 不起作用。现在 listener1: 10 rps listener2: 142 rps 在开始丢弃事件之前,主题需要 1523 秒才能充满。一旦它开始丢弃事件,侦听器 1 也将以与侦听器 2 相同的速度开始处理。
我正在寻找让每个听众独立运行而不阻碍其他听众的建议?