我一直在尝试使用 Java Flow API 构建应用程序。虽然在发布者和订阅者速度不同的情况下能够在发布者和订阅者之间执行背压的想法,但我不确定它是否真的有帮助,因为发布者和消费者通常都驻留在同一个应用程序中;至少,这就是几乎所有在线示例的样子。
例如,在我的应用程序中,有一个发布者生成从 RabbitMQ 检索到的消息,而一个订阅者处理这些消息。因此,消息被提交给 RabbitMQ 监听器中的发布者,如下所示:
@RabbitListener(queues = "rabbit_queue")
public void rabbitHandler(MessageObject message) {
// do some stuff to the message and then submit it to the publisher
publisher.submit(message);
}
// Then the message will be processed in the subscriber
如果发布者的生产速度快于订阅者的处理速度,订阅者可以调用一个小的n值 on subscription.request(n)。但是,有两件事我不确定我对如何request(n)帮助的理解是否正确:
- 由于在这种情况下发布者和订阅者都在同一个 Spring 应用程序中,它们几乎共享并受到相同数量的资源的限制。如果订阅者因为发送给它的元素太多而将耗尽内存或资源,我们应该能够减少
n.request(n)但这意味着发布者中的缓冲区大小将很快被填满。我可以增加发布者中的缓冲区大小,但我也受到订阅者面临的相同资源数量的限制,因为发布者和订阅者都在使用同一组资源的同一应用程序中。那么拥有发布者和request()方法的所有这些额外复杂性有什么意义呢? - 在我看来,出版商通常从某些来源接收其元素。有时,并非所有这些来源都可以被限制。就我而言,我有一个 RabbitMQ 侦听器将消息发送给发布者。但是发布者将这些消息发送到订阅的速率很大程度上取决于
rabbitHandler从 RabbitMQ 队列接收消息的速率。如果 RabbmitMQ 发送消息的速度超过了发布者的订阅者可以处理的速度,那么消息的缓冲仍然在应用程序内的发布者和订阅者之间完成,就会出现上述问题。
我很确定我对这个过程的理解有些错误,因为这对我来说就像是第 22 条问题。就像我的两只手只能握住这么多球,而我只是在两只手之间传递球并称之为背压。由于发布者和订阅者都受到相同数量的资源的限制,因为它们都在同一个应用程序中,当我可以简单地将消息传递给另一个处理程序并受到相同数量的限制时,具有这种额外复杂性的好处是什么资源也是这样的:
public class RabbitMqListener {
@RabbitListener(queues = "rabbit_queue")
public void rabbitHandler(MessageObject message) {
// do some stuff to the message and then submit it to the publisher
MessageProcessor.process(message);
}
}
public class MessageProcessor {
public static void process(MessageObject message) {
System.out.println("processing message...");
}
}
如果有人可以帮助我纠正我的理解,那就太好了。