我正在开发一个由 Spring Boot 2.1.1 和 WebFlux、Reactor 3.2.3、Mongo 3.8.2 和 Netty 4.1.31 运行的简单聊天服务。
每个聊天室都有 2 个集合 - 消息存档和包含当前事件的上限集合(例如,新消息事件、用户键入指示器等)。上限集合有 100 个元素,我正在使用 ReactiveMongoTemplate 的 tail() 方法来检索最新事件。
该服务公开了两种用于检索最近事件的端点:SSE 和轮询。我已经对 2000 个并发用户进行了一些压力测试,除了听聊天之外,还发送了大量的事件。
观察结果是:
- 每 2 秒轮询一次会给服务带来一点压力(测试期间 CPU 使用率约为 40%),而 MongoDB 几乎没有压力(~4%)
- 通过 SSE 收听最大化 MongoDB(~90%),也强调服务(它试图使用剩余的可用资源),但 Mongo 尤其挣扎,整体服务变得几乎没有响应。
观察结果似乎很明显,因为当我在测试期间通过 SSE 连接时,它几乎会在新事件到来时立即更新我 - 基本上,SSE 的响应速度是每 2 秒轮询的数百倍。
问题是:
鉴于客户端最终是订阅者(或者至少我认为它是由有限的知识给出的),我可以通过 ReactiveMongoTemplate 以某种方式限制发布消息的速率吗?或者以某种方式减少对新事件的需求,而不必做那个客户端?
我一直在尝试使用 Flux 缓冲和缓存,但它造成了更大的压力......
代码:
// ChatRepository.java
private static final Query chatEventsQuery = new Query();
public Flux<ChatEvent> getChatEventsStream(String chatId) {
return reactiveMongoTemplate.tail(
chatEventsQuery,
ChatEvent.class,
chatId
);
}
,
// ChatHandler.java
public Mono<ServerResponse> getChatStream(ServerRequest request) {
String chatId = request.pathVariable(CHAT_ID_PATH_VARIABLE);
String username = getUsername(request);
Flux<ServerSentEvent> chatEventsStream = chatRepository
.getChatEventsStream(chatId)
.map(addUserSpecificPropsToChatEvent(username))
.map(event -> ServerSentEvent.<ChatEvent>builder()
.event(event.getType().getEventName())
.data(event)
.build());
log.debug("\nExposing chat stream\nchat: {}\nuser: {}", chatId, username);
return ServerResponse.ok().body(
chatEventsStream,
ServerSentEvent.class
);
}
,
// ChatRouter.java
RouterFunction<ServerResponse> routes(ChatHandler handler) {
return route(GET("/api/chat/{chatId}/stream"), handler::getChatStream);
}