2

我试图展示在 Spring MVC 中使用 Reactive Streams 的优势。为此,我有一个运行两个端点的小型 Jetty 服务器:

  • /normal返回一个 POJO
  • /flux返回包装在 a 中的相同对象Mono

然后我启动一个客户端并在其中一个端点同时启动几千个请求。我本来希望看到第二个端点的错误更少,因为处理是异步进行的。但是,我有时会在启用异步的端点上观察到更多错误;在这两种情况下,任何地方都有 60 - 90% 的错误Connection refused: no further information

要么我在这里做错了什么,要么我不太明白。Connection refused正是我希望避免的那种事情。

服务器

这是我来自服务器的代码。在这种normal情况下,我从字面上用 a 阻塞了线程.sleep()

@Controller
public class FluxController {
    @GetMapping(value = "/normal", produces = MediaType.APPLICATION_JSON_VALUE)
    public Map normal() throws Exception {
        Thread.sleep(randomTime());
        return Collections.singletonMap("type", "normal");
    }

    @GetMapping(value = "/flux", produces = MediaType.APPLICATION_JSON_VALUE)
    public Mono<Map> flux() {
        return Mono.delay(Duration.ofMillis(randomTime()))
                .map(x -> Collections.singletonMap("type", "flux"));
    }

    private static long randomTime() {
        return ThreadLocalRandom.current().nextLong(200, 1000);
    }
}

服务器通过 Maven 在 Jetty 9.4.15 上运行,web.xml 定义为:

<web-app xmlns="http://xmlns.jcp.org/xml/ns/javaee"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://xmlns.jcp.org/xml/ns/javaee
                             http://xmlns.jcp.org/xml/ns/javaee/web-app_3_1.xsd" version="3.1">

客户

我的客户使用 Spring WebClient:

public class ClientApplication {

    private static final String ENDPOINT = "normal";

    private static final int REPETITIONS = 10_000;

    public static void main(String[] args) {
        WebClient client = WebClient.create("http://localhost:8080");

        AtomicInteger errors = new AtomicInteger(0);

        List<Mono<Response>> responses = IntStream.range(0, REPETITIONS)
                .mapToObj(i -> client.get()
                        .uri(ENDPOINT)
                        .retrieve()
                        .bodyToMono(Response.class)
                        .doOnError(e -> errors.incrementAndGet())
                        .onErrorResume(e -> Mono.empty())
                )
                .collect(Collectors.toList());
        Mono.when(responses)
                .block();
        System.out.println(String.format("%-2f %% errors", errors.get() * 100.0 / REPETITIONS));
    }

    static class Response {
        public String type;
    }

}

与此处问题类似的前提:WebFlux async processing。主要区别在于我正在测试错误率,或同步连接数;我预计速度不会增加。

4

1 回答 1

2

事实证明,尽管 Servlet 3.1 规范支持非阻塞 IO,但 Spring MVC 不支持。要充分利用反应式 API,您必须使用 WebFlux。请参阅此处了解更多信息:https ://youtu.be/Dp_aJh-akkU?t=1738 。

此图展示了 Spring MVC(左侧)与 Webflux(右侧)相比的工作原理。

Spring MVC 与 Webflux 堆栈

我使用Gatling进行了更多测试,得到了相似的结果:两者花费的时间大致相同,异步的可靠性稍差。但是,我确实注意到了一个半可重现的差异:异步结果有时响应更快:

普通的

返回标准类型时的响应时间分布

第 50 个百分位:33.6 秒 第 95 个百分位:35.4 秒

反应性

返回 Flux 时的响应时间分布

第 50 个百分位:6.51 秒 第 95 个百分位:49.5 秒

我仍然不清楚在 Spring MVC 中使用异步调用(例如 DeferredResult)或 Reactive Streams API 的优势。因此,如果有人能够用具体的用例来澄清这一点,我们将不胜感激。

于 2019-05-15T07:19:55.337 回答