2

我正在开发一个由 Spring Boot 2.1.1 和 WebFlux、Reactor 3.2.3、Mongo 3.8.2 和 Netty 4.1.31 运行的简单聊天服务。

每个聊天室都有 2 个集合 - 消息存档和包含当前事件的上限集合(例如,新消息事件、用户键入指示器等)。上限集合有 100 个元素,我正在使用 ReactiveMongoTemplate 的 tail() 方法来检索最新事件。

该服务公开了两种用于检索最近事件的端点:SSE 和轮询。我已经对 2000 个并发用户进行了一些压力测试,除了听聊天之外,还发送了大量的事件。

观察结果是:

  • 每 2 秒轮询一次会给服务带来一点压力(测试期间 CPU 使用率约为 40%),而 MongoDB 几乎没有压力(~4%)
  • 通过 SSE 收听最大化 MongoDB(~90%),也强调服务(它试图使用剩余的可用资源),但 Mongo 尤其挣扎,整体服务变得几乎没有响应。

观察结果似乎很明显,因为当我在测试期间通过 SSE 连接时,它几乎会在新事件到来时立即更新我 - 基本上,SSE 的响应速度是每 2 秒轮询的数百倍。

问题是:

鉴于客户端最终是订阅者(或者至少我认为它是由有限的知识给出的),我可以通过 ReactiveMongoTemplate 以某种方式限制发布消息的速率吗?或者以某种方式减少对新事件的需求,而不必做那个客户端?

我一直在尝试使用 Flux 缓冲和缓存,但它造成了更大的压力......

代码:

// ChatRepository.java

private static final Query chatEventsQuery = new Query();

public Flux<ChatEvent> getChatEventsStream(String chatId) {
    return reactiveMongoTemplate.tail(
            chatEventsQuery,
            ChatEvent.class,
            chatId
    );
}

,

// ChatHandler.java

public Mono<ServerResponse> getChatStream(ServerRequest request) {

    String chatId = request.pathVariable(CHAT_ID_PATH_VARIABLE);
    String username = getUsername(request);

    Flux<ServerSentEvent> chatEventsStream = chatRepository
            .getChatEventsStream(chatId)
            .map(addUserSpecificPropsToChatEvent(username))
            .map(event -> ServerSentEvent.<ChatEvent>builder()
                    .event(event.getType().getEventName())
                    .data(event)
                    .build());

    log.debug("\nExposing chat stream\nchat: {}\nuser: {}", chatId, username);

    return ServerResponse.ok().body(
            chatEventsStream,
            ServerSentEvent.class
    );
}

,

// ChatRouter.java

RouterFunction<ServerResponse> routes(ChatHandler handler) {
    return route(GET("/api/chat/{chatId}/stream"), handler::getChatStream);
}
4

1 回答 1

2

答案是: 你用Flux.buffer方法来做。然后,flux 将以定义的速率将事件批​​量发送给订阅者。

我发布的代码有两个主要问题

  1. 鉴于多个用户通常在听一个聊天,我重构了 ChatRepository 以利用存储在咖啡因缓存中的“热”、可重播的 Flux(现在每个聊天有 1 个流,而不是每个用户 1 个流)。此外,我以较短的时间间隔缓冲它们,以避免在繁忙的聊天中将事件推送给客户端时使用大量资源。

  2. new Query()我在 ChatRepository 中使用的是多余的。我查看了 ReactiveMongoTemplate 的代码,如果提供了非空查询,则逻辑要复杂一些。最好改为传递null 给 ReactiveMongoTemplate 的tail()方法。

代码后重构

// ChatRepository.java

public Flux<List<ChatEvent>> getChatEventsStream(String chatId) {
    return Optional.ofNullable(chatStreamsCache.getIfPresent(chatId))
            .orElseGet(newCachedChatEventsStream(chatId))
            .autoConnect();
}

private Supplier<ConnectableFlux<List<ChatEvent>>> newCachedChatEventsStream(String chatId) {
    return () -> {
        ConnectableFlux<List<ChatEvent>> chatEventsStream = reactiveMongoTemplate.tail(
                null,
                ChatEvent.class,
                chatId
        ).buffer(Duration.ofMillis(chatEventsBufferInterval))
                .replay(chatEventsReplayCount);

        chatStreamsCache.put(chatId, chatEventsStream);

        return chatEventsStream;
    };
}

,

// ChatHandler.java

public Mono<ServerResponse> getChatStream(ServerRequest request) {

    String chatId = request.pathVariable(CHAT_ID_PATH_VARIABLE);
    String username = getUsername(request);

    Flux<ServerSentEvent> chatEventsStream = chatRepository
            .getChatEventsStream(chatId)
            .map(addUserSpecificPropsToChatEvents(username))
            .map(event -> ServerSentEvent.<List<ChatEvent>>builder()
                    .event(CHAT_SSE_NAME)
                    .data(event)
                    .build());

    log.debug("\nExposing chat stream\nchat: {}\nuser: {}", chatId, username);

    return ServerResponse.ok().body(
            chatEventsStream,
            ServerSentEvent.class
    );
}

,

应用这些更改后,即使有 3000 个活跃用户,该服务也能正常运行(JVM 使用约 50% 的 CPU,Mongo 约 7% 主要是由于大量插入 - 流现在不那么明显了)

于 2019-01-03T23:45:55.913 回答