我试图展示在 Spring MVC 中使用 Reactive Streams 的优势。为此,我有一个运行两个端点的小型 Jetty 服务器:
/normal
返回一个 POJO/flux
返回包装在 a 中的相同对象Mono
然后我启动一个客户端并在其中一个端点同时启动几千个请求。我本来希望看到第二个端点的错误更少,因为处理是异步进行的。但是,我有时会在启用异步的端点上观察到更多错误;在这两种情况下,任何地方都有 60 - 90% 的错误Connection refused: no further information
。
要么我在这里做错了什么,要么我不太明白。Connection refused
正是我希望避免的那种事情。
服务器
这是我来自服务器的代码。在这种normal
情况下,我从字面上用 a 阻塞了线程.sleep()
:
@Controller
public class FluxController {
@GetMapping(value = "/normal", produces = MediaType.APPLICATION_JSON_VALUE)
public Map normal() throws Exception {
Thread.sleep(randomTime());
return Collections.singletonMap("type", "normal");
}
@GetMapping(value = "/flux", produces = MediaType.APPLICATION_JSON_VALUE)
public Mono<Map> flux() {
return Mono.delay(Duration.ofMillis(randomTime()))
.map(x -> Collections.singletonMap("type", "flux"));
}
private static long randomTime() {
return ThreadLocalRandom.current().nextLong(200, 1000);
}
}
服务器通过 Maven 在 Jetty 9.4.15 上运行,web.xml 定义为:
<web-app xmlns="http://xmlns.jcp.org/xml/ns/javaee"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://xmlns.jcp.org/xml/ns/javaee
http://xmlns.jcp.org/xml/ns/javaee/web-app_3_1.xsd" version="3.1">
客户
我的客户使用 Spring WebClient:
public class ClientApplication {
private static final String ENDPOINT = "normal";
private static final int REPETITIONS = 10_000;
public static void main(String[] args) {
WebClient client = WebClient.create("http://localhost:8080");
AtomicInteger errors = new AtomicInteger(0);
List<Mono<Response>> responses = IntStream.range(0, REPETITIONS)
.mapToObj(i -> client.get()
.uri(ENDPOINT)
.retrieve()
.bodyToMono(Response.class)
.doOnError(e -> errors.incrementAndGet())
.onErrorResume(e -> Mono.empty())
)
.collect(Collectors.toList());
Mono.when(responses)
.block();
System.out.println(String.format("%-2f %% errors", errors.get() * 100.0 / REPETITIONS));
}
static class Response {
public String type;
}
}
与此处问题类似的前提:WebFlux async processing。主要区别在于我正在测试错误率,或同步连接数;我预计速度不会增加。