2

我正在使用 Spring WebSockets、Messaging 和 RabbitMQ 开发基于事件的实时应用程序。在这个应用程序中,消息需要按照它们插入 RabbitMQ 的确切顺序传递给客户端。

“已编辑”

我们的目标是从浏览器接收消息,在服务器上按顺序处理它(针对由路由参数确定的唯一对象),丰富消息并通过外部 STOMP MQ (RabbitMQ) 将其广播到所有订阅浏览器。

我们的MessageMapping方法如下:

@MessageMapping(/commands.{route}.{data}) public CommandMessage receiveCommand(CommandMessage message, Principal principal) {

try {

  // Get object to synch on using route
  Object o = ...

  syncrhonized(o) {

    // Perform command on object

    // Set message server sequence
    message.setServerSequence(o.getAutoIncrementSequence());

    // Log server sequence
    log.debug("Message server sequence:" + message.getServerSequence());

    // Send to external MQ for broadcasting to all subscribers
    return message;
  }

} catch (Exception e) {
  ...
}

return null;

}

如果我们将 ClientInboundChannel 和 ClientOutboundChannels 配置为 corePoolSize 和 maxPoolSize 各为 1,则所有消息都是有序的。

如果我们在 ClientInboundChannel 上增加 corePoolSize 和 maxPoolSize,消息到达 MQ 的顺序不正确;如果我们对 ClientOutboundChannel 进行同样的增加,消息会以错误的顺序到达浏览器。

]

测试是使用单个浏览器客户端完成的。

我们打开了对 StompBrokerRelayMessageHandler 的跟踪,并收到了如下日志条目:

2014-05-06 14:26:39 TRACE [clientInboundChannel-6] osmssStompBrokerRelayMessageHandler [StompBrokerRelayMessageHandler.java:412] 处理消息=[Payload byte[303]][Headers={stompCommand=SEND, nativeHeaders={content-type=[ application/json;charset=UTF-8],destination=[/topic/commands.BKN01.20140318]},simpMessageType=MESSAGE,simpDestination=/topic/commands.BKN01.20140318,contentType=application/json;charset=UTF- 8,simpSessionId=ehjcoxb3,id=a31d0e3d-12cc-f562-1ec2-e2d7ba0899eb,时间戳=1399400799940}] 2014-05-06 14:26:39 DEBUG [clientInboundChannel-6] osmssStompBrokerRelayMessageHandler [StompBrokerRelayMessageHandler] java:658 消息处理程序。经纪人 2014-05-06 14:26:39 TRACE [clientInboundChannel-3] osmssStompBrokerRelayMessageHandler [StompBrokerRelayMessageHandler.java:406] 忽略到目的地的消息=/app/commands.BKN01.20140318 2014-05-06 14:26:39 TRACE [clientInboundChannel-7] osmssStompBrokerRelayMessageHandler [StompBrokerRelayMessageHandler.java:406] 忽略到目的地的消息=/app/commands.BKN01 .20140318 2014-05-06 14:26:39 TRACE [clientInboundChannel-1] osmssStompBrokerRelayMessageHandler [StompBrokerRelayMessageHandler.java:412] 处理消息=[Payload byte[314]][Headers={stompCommand=SEND, nativeHeaders={content-type =[application/json;charset=UTF-8], 目的地=[/topic/commands.BKN01.20140318]}, simpMessageType=MESSAGE, simpDestination=/topic/commands.BKN01.20140318, contentType=application/json;charset= UTF-8,simpSessionId=ehjcoxb3,id=3cc7b4ae-8ea4-ef8a-6c4d-c3bc1ed23bcd,时间戳=1399400799947}] 2014-05-06 14:26:39 调试 [clientInboundChannel-1] osmssStompBrokerRelayMessageHandler [StompBrokerRelayMessageHandler.java:658] 将消息转发给代理

我们还为 org.springframework.web.socket.messaging 包中的 StompSubProtocolHandler 开启了跟踪,并收到如下消息:

2014-05-07 10:58:58 TRACE [http-nio-8080-exec-5] oswsmStompSubProtocolHandler [StompSubProtocolHandler.java:180] 收到来自客户端 session=u8wrnsr6 的消息

这些信息都没有提供一种简单的方法来将我们的 message.serverSequence 属性(在发送到 MQ 之前设置)与日志详细信息的各种 id 进行映射。

有什么方法可以增加入站/出站通道线程,从而使排序保持不变?例如,通道是否可以绑定到“路线”,或者线程可以与“路线”挂钩?

请帮忙。

谢谢,

4

1 回答 1

2

[编辑]

谢谢,我现在明白了。事实上,目前没有办法确保来自外部代理的消息以完全相同的顺序传递给客户端,并且来自客户端的消息将以完全相同的顺序发送给外部代理。

我们可以为会话中的每条消息分配一个索引,然后在必要时检查和缓冲以强制执行顺序,但这将是一个新功能。随意在 JIRA 中创建请求。

至于现在,您可以考虑创建一个 ChannelInterceptor 来检查每条消息以查看它属于哪个会话,然后在发送之前在其上设置一些增量索引标头(在 clientInboundChannel 或 clientOutboundChannel 上)。然后扩展 StompSubProtocolHandler 和 StompBrokerRelayMessageHandler 以检查索引头并尝试强制执行消息的顺序。

于 2014-05-06T13:42:53.773 回答