我正在使用 Spring WebSockets、Messaging 和 RabbitMQ 开发基于事件的实时应用程序。在这个应用程序中,消息需要按照它们插入 RabbitMQ 的确切顺序传递给客户端。
“已编辑”
我们的目标是从浏览器接收消息,在服务器上按顺序处理它(针对由路由参数确定的唯一对象),丰富消息并通过外部 STOMP MQ (RabbitMQ) 将其广播到所有订阅浏览器。
我们的MessageMapping方法如下:
@MessageMapping(/commands.{route}.{data}) public CommandMessage receiveCommand(CommandMessage message, Principal principal) {
try {
// Get object to synch on using route
Object o = ...
syncrhonized(o) {
// Perform command on object
// Set message server sequence
message.setServerSequence(o.getAutoIncrementSequence());
// Log server sequence
log.debug("Message server sequence:" + message.getServerSequence());
// Send to external MQ for broadcasting to all subscribers
return message;
}
} catch (Exception e) {
...
}
return null;
}
如果我们将 ClientInboundChannel 和 ClientOutboundChannels 配置为 corePoolSize 和 maxPoolSize 各为 1,则所有消息都是有序的。
如果我们在 ClientInboundChannel 上增加 corePoolSize 和 maxPoolSize,消息到达 MQ 的顺序不正确;如果我们对 ClientOutboundChannel 进行同样的增加,消息会以错误的顺序到达浏览器。
]
测试是使用单个浏览器客户端完成的。
我们打开了对 StompBrokerRelayMessageHandler 的跟踪,并收到了如下日志条目:
2014-05-06 14:26:39 TRACE [clientInboundChannel-6] osmssStompBrokerRelayMessageHandler [StompBrokerRelayMessageHandler.java:412] 处理消息=[Payload byte[303]][Headers={stompCommand=SEND, nativeHeaders={content-type=[ application/json;charset=UTF-8],destination=[/topic/commands.BKN01.20140318]},simpMessageType=MESSAGE,simpDestination=/topic/commands.BKN01.20140318,contentType=application/json;charset=UTF- 8,simpSessionId=ehjcoxb3,id=a31d0e3d-12cc-f562-1ec2-e2d7ba0899eb,时间戳=1399400799940}] 2014-05-06 14:26:39 DEBUG [clientInboundChannel-6] osmssStompBrokerRelayMessageHandler [StompBrokerRelayMessageHandler] java:658 消息处理程序。经纪人 2014-05-06 14:26:39 TRACE [clientInboundChannel-3] osmssStompBrokerRelayMessageHandler [StompBrokerRelayMessageHandler.java:406] 忽略到目的地的消息=/app/commands.BKN01.20140318 2014-05-06 14:26:39 TRACE [clientInboundChannel-7] osmssStompBrokerRelayMessageHandler [StompBrokerRelayMessageHandler.java:406] 忽略到目的地的消息=/app/commands.BKN01 .20140318 2014-05-06 14:26:39 TRACE [clientInboundChannel-1] osmssStompBrokerRelayMessageHandler [StompBrokerRelayMessageHandler.java:412] 处理消息=[Payload byte[314]][Headers={stompCommand=SEND, nativeHeaders={content-type =[application/json;charset=UTF-8], 目的地=[/topic/commands.BKN01.20140318]}, simpMessageType=MESSAGE, simpDestination=/topic/commands.BKN01.20140318, contentType=application/json;charset= UTF-8,simpSessionId=ehjcoxb3,id=3cc7b4ae-8ea4-ef8a-6c4d-c3bc1ed23bcd,时间戳=1399400799947}] 2014-05-06 14:26:39 调试 [clientInboundChannel-1] osmssStompBrokerRelayMessageHandler [StompBrokerRelayMessageHandler.java:658] 将消息转发给代理
我们还为 org.springframework.web.socket.messaging 包中的 StompSubProtocolHandler 开启了跟踪,并收到如下消息:
2014-05-07 10:58:58 TRACE [http-nio-8080-exec-5] oswsmStompSubProtocolHandler [StompSubProtocolHandler.java:180] 收到来自客户端 session=u8wrnsr6 的消息
这些信息都没有提供一种简单的方法来将我们的 message.serverSequence 属性(在发送到 MQ 之前设置)与日志详细信息的各种 id 进行映射。
有什么方法可以增加入站/出站通道线程,从而使排序保持不变?例如,通道是否可以绑定到“路线”,或者线程可以与“路线”挂钩?
请帮忙。
谢谢,
担