2

我的应用程序使用 Spring Framework 4 包含的 spring-messaging 模块(具有来自 Spring Integration 项目的关键抽象,例如 Message、MessageChannel、MessageHandler 和其他可以作为此类消息传递架构的基础的其他东西。)

我的应用程序使用 Websocket 和 STOMP。它与大量 java websocket 客户端保持连接(websocket 会话),其中一个要求是使用 akka 或反应器。

我想在 clientInboundChannelExecutor 和 clientOutboundChannelExecutor 中集成 spring-reactor RingBufferAsyncTaskExecutor 代替 ThreadPoolTask​​Executor 以获得更好的吞吐量。至少我已经将这种方法确定为将 spring-reactor 集成到我现有应用程序中的方法——这可能不是正确的方法。

我正在查看 reactor-si-quickstart,因为它演示了如何将反应器与 spring 集成一起使用,并且 Spring Framework 4 中的 spring-messaging 包含来自 Spring Integration 项目的关键抽象。我认为这将是最接近的参考。

我的 web socket 工作 java 配置具有以下类声明 public class WebSocketConfig extends WebSocketMessageBrokerConfigurationSupport 实现 WebSocketMessageBrokerConfigurer。WebSocketMessageBrokerConfigurationSupport 扩展了 AbstractMessageBrokerConfiguration。

在 org.springframework.messaging.simp.config.AbstractMessageBrokerConfiguration 我想尝试配置 RingBufferAsyncTaskExecutor 代替 ThreadPoolTask​​Executor

@Bean
public ThreadPoolTaskExecutor clientInboundChannelExecutor() {
    TaskExecutorRegistration reg = getClientInboundChannelRegistration().getOrCreateTaskExecRegistration();
    ThreadPoolTaskExecutor executor = reg.getTaskExecutor();
    executor.setThreadNamePrefix("clientInboundChannel-");
    return executor;
}

当我尝试在 WebSocketConfig 中覆盖此方法时,“ChannelRegistration 类型的方法 getOrCreateTaskExecRegistration() 不可见”,因为在 AbstractMessageBrokerConfiguration 它受到保护......

protected final ChannelRegistration getClientInboundChannelRegistration() {
    if (this.clientInboundChannelRegistration == null) {
        ChannelRegistration registration = new ChannelRegistration();
        configureClientInboundChannel(registration);
        this.clientInboundChannelRegistration = registration;
    }
    return this.clientInboundChannelRegistration;
}

我不完全理解我的 WebSocketConfig 中的 WebSocketMessageBrokerConfigurationSupport 层次结构或 WebSocketMessageBrokerConfigurer 接口。我只是在尝试覆盖我的自定义工作所需的内容。

不确定它是否相关,但我不需要外部代理,因为我的应用程序目前没有向所有连接的订阅者发送任何数据,而且不太可能下线。与守护进程类型的 java websocket 客户端的通信是点对点的,但是浏览器中的 web ui websocket 确实使用订阅来获取实时数据,因此它是一个方便的设置(而不是 spring 集成直接通道)并且有明确的来源如何设置它 - 我仍然不确定它是最有效的应用程序设计。spring-framework 参考文档中描述的基于 WebSocket 消息传递架构的 STOMP 是最全面的方法,因为这是我的第一个 spring 项目。

是否有可能通过将 spring-reactor 集成到我现有的应用程序中来提高性能?

或者我是否应该尝试使用 spring 集成,据我所知,这将需要进行大量修改 - 鉴于 Spring Framework 4 包含的 spring-messaging 模块来自 spring 集成,这似乎是不合逻辑的。

我应该如何将 spring-reactor 集成到我的标准 spring 框架中 4 STOMP Over WebSocket 消息架构?

如果在 clientInboundChannelExecutor 和 clientOutboundChannelExecutor 中配置 RingBufferAsyncTaskExecutor 代替 ThreadPoolTask​​Executor 是正确的方法,我应该怎么做?

4

1 回答 1

3

实际上RingBufferAsyncTaskExecutoris not ThreadPoolTaskExecutor,所以你不能那样使用它。

您可以简单地从您的impl中覆盖clientInbound(Outbound)Channelbean,然后使用:AbstractWebSocketMessageBrokerConfigurer@EnableWebSocketMessageBroker

@Configuration
@EnableWebSocketMessageBroker
@EnableReactor
public class WebSocketConfig extends AbstractWebSocketMessageBrokerConfigurer {

    @autowired
    Environment reactorEnv;

    @Override
    public void registerStompEndpoints(StompEndpointRegistry registry) {
        registry.addEndpoint("/ws").withSockJS();
    }

    @Override
    public void configureMessageBroker(MessageBrokerRegistry configurer) {
        configurer.setApplicationDestinationPrefixes("/app");
        configurer.enableSimpleBroker("/topic", "/queue");
    }

    @Bean
    public AbstractSubscribableChannel clientInboundChannel() {
        ExecutorSubscribableChannel channel = new ExecutorSubscribableChannel(new RingBufferAsyncTaskExecutor(this.reactorEnv));
        ChannelRegistration reg = getClientOutboundChannelRegistration();
        channel.setInterceptors(reg.getInterceptors());
        return channel;
    }

}

请注意Spring Integration中的 WebSocket 支持。

顺便说一句:请指出我的链接reactor-si-quickstart

于 2014-11-21T10:53:26.803 回答