52

我是Spring Web-Flux的初学者。我写了一个控制器如下:

@RestController
public class FirstController 
{
    @GetMapping("/first")
    public Mono<String> getAllTweets() 
    {
        return Mono.just("I am First Mono")
    }
}

我知道反应性好处之一是Backpressure,它可以平衡请求或响应率。我想了解如何在Spring Web-Flux中具有背压机制。

4

1 回答 1

102

WebFlux 中的背压

为了理解 Backpressure 在当前 WebFlux 框架的实现中是如何工作的,我们必须在这里回顾一下默认使用的传输层。我们可能还记得,浏览器和服务器之间的正常通信(服务器到服务器的通信通常也一样)是通过 TCP 连接完成的。WebFlux 也使用该传输在客户端和服务器之间进行通信。然后,为了了解背压控制术语的含义,我们必须从 Reactive Streams 规范的角度回顾一下背压的含义。

基本语义定义了如何通过背压调节流元素的传输。

因此,从该陈述中,我们可以得出结论,在 Reactive Streams 中,背压是一种通过传输(通知)接收者可以消费多少元素来调节需求的机制;这里有一个棘手的问题。TCP 具有字节抽象而不是逻辑元素抽象。我们通常所说的背压控制想要的是控制发送/接收到/从网络的逻辑元素的数量。即使 TCP 有自己的流量控制(请参见此处的含义和此处的动画) 此流量控制仍然是针对字节而不是针对逻辑元素的。

在WebFlux模块目前的实现中,背压是由传输流控制来调节的,但它并没有暴露接收者的真实需求。为了最终看到交互流程,请看下图:

在此处输入图像描述

为简单起见,上图显示了两个微服务之间的通信,其中左侧发送数据流,右侧使用该流。以下编号列表提供了该图的简要说明:

  1. 这是 WebFlux 框架,它对逻辑元素到字节的转换以及与 TCP(网络)之间的传输/接收进行了适当的处理。
  2. 这是在作业完成后请求下一个元素的元素的长时间运行处理的开始。
  3. 在这里,虽然业务逻辑没有需求,但 WebFlux 将来自网络的字节排入队列而无需他们的确认(业务逻辑没有需求)。
  4. 由于 TCP 流控制的性质,服务 A 仍可能向网络发送数据。

从上图中我们可以注意到,接收者暴露的需求与发送者的需求不同(这里的需求是逻辑元素)。这意味着两者的需求是隔离的,仅适用于 WebFlux <-> 业务逻辑(服务)交互,对 Service A <-> Service B 交互的背压暴露较少。这一切都意味着 WebFlux 中的背压控制并不像我们预期的那样公平。

这一切都意味着 WebFlux 中的背压控制并不像我们预期的那样公平。

但是我还是想知道怎么控制背压

如果我们仍然想对 WebFlux 中的背压进行不公平的控制,我们可以在 Project Reactor 运营商的支持下做到这一点,例如limitRate(). 以下示例显示了我们如何使用该运算符:

@PostMapping("/tweets")
public Mono<Void> postAllTweets(Flux<Tweet> tweetsFlux) {
    
    return tweetService.process(tweetsFlux.limitRate(10))
                       .then();
}

从示例中我们可以看出,limitRate()运算符允许定义一次预取的元素数量。这意味着即使最终订阅者请求Long.MAX_VALUE元素,limitRate运营商也会将该需求拆分为多个块,并且不允许一次消耗更多。我们可以对元素发送过程做同样的事情:

@GetMapping("/tweets")
public Flux<Tweet> getAllTweets() {
    
    return tweetService.retreiveAll()
                       .limitRate(10);
}

上面的示例表明,即使 WebFlux 一次请求超过 10 个元素,也会limitRate()将需求限制为预取大小,并防止一次消耗超过指定数量的元素。

另一种选择是实现自己的Subscriber或扩展BaseSubscriberfrom Project Reactor。例如,以下是我们如何做到这一点的天真示例:

class MyCustomBackpressureSubscriber<T> extends BaseSubscriber<T> {

    int consumed;
    final int limit = 5;

    @Override
    protected void hookOnSubscribe(Subscription subscription) {
        request(limit);
    }
    
    @Override
    protected void hookOnNext(T value) {
        // do business logic there 

        consumed++;
        
        if (consumed == limit) {
            consumed = 0;
            
            request(limit);
        }
    }
}

RSocket 协议的公平背压

为了通过网络边界实现逻辑元素背压,我们需要一个合适的协议。幸运的是,有一个叫做RScoket 协议。RSocket 是一种应用层协议,允许通过网络边界传输实际需求。该协议有一个 RSocket-Java 实现,允许设置 RSocket 服务器。在服务器到服务器通信的情况下,相同的 RSocket-Java 库也提供了客户端实现。要了解更多如何使用 RSocket-Java,请在此处查看以下示例。对于浏览器-服务器通信,有一个RSocket-JS实现,它允许通过 WebSocket 连接浏览器和服务器之间的流式通信。

基于 RSocket 的已知框架

现在有一些框架,建立在 RSocket 协议之上。

变形虫

其中一个框架是 Proteus 项目,它提供了基于 RSocket 构建的成熟微服务。此外,Proteus 与 Spring 框架很好地集成,所以现在我们可以实现公平的背压控制(参见那里的示例)

进一步阅读

于 2018-09-09T13:55:24.543 回答