45

我正在将 Spring Websocket 与 STOMP(简单消息代理)一起使用。在我的@Controller我使用 method-level @SubscribeMapping,它应该为客户端订阅一个主题,以便客户端随后接收该主题的消息。假设,客户端订阅了“聊天”主题:

stompClient.subscribe('/app/chat', ...);

由于客户端订阅了“/app/chat ”,而不是“/topic/chat”,因此该订阅将转到使用映射的方法@SubscribeMapping

@SubscribeMapping("/chat")
public List getChatInit() {
    return Chat.getUsers();
}

这是 Spring 参考。说:

默认情况下,@SubscribeMapping 方法的返回值作为消息直接发送回连接的客户端,并且不通过代理。这对于实现请求-回复消息交互很有用;例如,在初始化应用程序 UI 时获取应用程序数据。

好的,这就是我想要的,但只是部分!订阅后发送一些初始化数据,好吧。但是订阅呢?在我看来,这里发生的事情只是一个request-reply,就像一个服务一样。订阅刚刚被消耗。如果是这种情况,请澄清我。

  • 如果经纪人没有参与,客户是否订阅了某个地方?
  • 如果稍后我想向“聊天”订阅者发送一些消息,客户端会收到吗?似乎并非如此。
  • 谁真正意识到订阅?经纪人?还是其他人?

如果这里客户没有在任何地方订阅,我想知道为什么我们称之为“订阅”;因为客户端只收到一条消息,而不是未来的消息。

编辑:

为了确保订阅已经实现,我尝试如下:

服务器端:

配置:

@Configuration
@EnableWebSocketMessageBroker
public class WebSocketConfig extends AbstractWebSocketMessageBrokerConfigurer {

    @Override
    public void configureMessageBroker(MessageBrokerRegistry config) {
        config.enableSimpleBroker("/topic");
        config.setApplicationDestinationPrefixes("/app");
    }

    @Override
    public void registerStompEndpoints(StompEndpointRegistry registry) {
        registry.addEndpoint("/hello").withSockJS();
    }
}

控制器:

@Controller
public class GreetingController {

    @MessageMapping("/hello")
    @SendTo("/topic/greetings")
    public Greeting greeting(HelloMessage message) throws Exception {
        System.out.println("inside greeting");
        return new Greeting("Hello, " + message.getName() + "!");
    }

    @SubscribeMapping("/topic/greetings")
    public Greeting try1() {
        System.out.println("inside TRY 1");
        return new Greeting("Hello, " + "TRY 1" + "!");
    }
}

客户端:

...
    stompClient.subscribe('/topic/greetings', function(greeting){
                        console.log('RECEIVED !!!');
                    });
    stompClient.send("/app/hello", {}, JSON.stringify({ 'name': name }));
...

我想发生的事情:

  1. 当客户端订阅 ' /topic/greetings' 时,该方法try1被执行。
  2. 当客户端向' /app/hello'发送消息时,它应该收到问候消息,即@SendTo' /topic/greetings'。

结果:

  1. 如果客户端订阅/topic/greetings,则该方法try1无法捕获它。

  2. 当客户端发送 msg 到 ' /app/hello' 时,greeting方法被执行,并且客户端收到了问候消息。所以我们知道它已经/topic/greetings正确地订阅了''。

  3. 但请记住 1. 失败了。经过一番尝试,客户端订阅的时候已经可以了'/app/topic/greetings',即前缀/app(这个可以通过配置理解)。

  4. 现在 1. 正在工作,但是这次 2. 失败:当客户端发送 msg 到 ' /app/hello' 时,是的,greeting方法已执行,但客户端没有收到问候消息。(因为可能现在客户端订阅了以 ' /app' 为前缀的主题,这是不需要的。)

所以,我得到的是我想要的 1 或 2 个,但不是这 2 个加在一起。

  • 如何使用这种结构实现这一点(正确配置映射路径)?
4

5 回答 5

25

默认情况下,@SubscribeMapping 方法的返回值作为消息直接发送回连接的客户端,并且不通过代理

(强调我的)

这里 Spring Framework 文档描述的是响应消息发生了什么,而不是传入SUBSCRIBE消息。

所以回答你的问题:

  • 是的,客户订阅了该主题
  • 是的,如果您使用该主题发送消息,订阅该主题的客户端将收到一条消息
  • 消息代理负责管理订阅

有关订阅管理的更多信息

使用SimpleMessageBroker,消息代理实现存在于您的应用程序实例中。订阅注册由DefaultSubscriptionRegistry. 接收消息时,SimpleBrokerMessageHandler处理SUBSCRIPTION消息并注册订阅(请参阅此处的实现)。

对于像 RabbitMQ 这样的“真实”消息代理,您已经配置了一个将消息转发到代理的 Stomp 代理中继。在这种情况下,SUBSCRIBE消息被转发到代理,负责管理订阅(参见这里的实现)。

更新 - 更多关于 STOMP 消息流

如果您查看STOMP message flow 上的参考文档,您会看到:

  • 对“/topic/greeting”的订阅通过“clientInboundChannel”并转发给代理
  • 发送到“/app/greeting”的问候语通过“clientInboundChannel”并转发到 GreetingController。控制器添加当前时间,返回值通过“brokerChannel”作为消息传递给“/topic/greeting”(根据约定选择目的地,但可以通过@SendTo 覆盖)。

所以这里/topic/hello是代理目的地;在那里发送的消息直接转发到代理。While是应用程序目的地,除非另有说明,否则/app/hello应该生成要发送到的消息。/topic/hello@SendTo

现在你更新的问题在某种程度上是一个不同的问题,如果没有更精确的用例,很难说哪种模式最能解决这个问题。这里有几个:

  • 您希望客户端在发生某些事情时异步地意识到:订阅特定主题/topic/hello
  • 您要广播消息:向特定主题发送消息/topic/hello
  • 您想立即获得反馈,例如初始化应用程序的状态:订阅应用程序目标/app/hello,控制器立即响应消息
  • 您想将一条或多条消息发送到任何应用程序目标/app/hello:使用组合@MessageMapping@SendTo消息模板。

如果您想要一个好的示例,请查看这个聊天应用程序,该应用程序展示了带有真实世界用例的 Spring websocket 功能日志

于 2015-03-17T09:55:59.393 回答
18

所以两者都有:

  • 使用主题处理订阅
  • 在该主题上使用 @SubscribeMapping 来提供连接响应

不像你所经历的那样工作(和我一样)。

解决您的情况的方法(就像我做的那样)是:

  1. 删除 @SubscribeMapping - 它仅适用于 /app 前缀
  2. 像你自然一样订阅 /topic(没有 /app 前缀)
  3. 实现一个 ApplicationListener

    1. 如果您想直接回复单个客户端,请使用用户目的地(请参阅websocket-stomp-user-destination 或者您也可以订阅子路径,例如 /topic/my-id-42 ,那么您可以向此发送消息子主题(我不知道您的确切用例,我的是我有专门的订阅,如果我想做广播,我会遍历它们)

    2. 收到 StompCommand.SUBSCRIBE 后,立即在 ApplicationListener 的 onApplicationEvent 方法中发送消息

订阅事件处理程序:

@Override
  public void onApplicationEvent(SessionSubscribeEvent event) {
      Message<byte[]> message = event.getMessage();
      StompHeaderAccessor accessor = StompHeaderAccessor.wrap(message);
      StompCommand command = accessor.getCommand();
      if (command.equals(StompCommand.SUBSCRIBE)) {
          String sessionId = accessor.getSessionId();
          String stompSubscriptionId = accessor.getSubscriptionId();
          String destination = accessor.getDestination();
          // Handle subscription event here
          // e.g. send welcome message to *destination*
       }
  }
于 2015-07-21T14:47:24.227 回答
7

我遇到了同样的问题,当我同时订阅客户端/topic/app客户端时,最终切换到解决方案,缓冲处理程序上收到的所有内容,/topic直到/app-bound 下载所有聊天历史记录,这就是@SubscribeMapping返回的内容。然后我将所有最近的聊天条目与在 a 上收到的条目合并/topic- 在我的情况下可能有重复。

另一种工作方法是声明

registry.enableSimpleBroker("/app", "/topic");
registry.setApplicationDestinationPrefixes("/app", "/topic");

显然,并不完美。但工作:)

于 2017-05-05T09:14:23.637 回答
5

嗨 Mert 虽然您的问题是 4 年前提出的,但我仍然会尝试回答,因为我最近在同一个问题上摸不着头脑并最终解决了它。

这里的关键部分是@SubscribeMapping一次性请求-响应交换,因此try1()控制器中的方法只会在客户端代码运行后立即触发一次

stompClient.subscribe('/topic/greetings', callback)

之后就没有办法触发try1()stompClient.send(...)

这里的另一个问题是控制器是应用程序消息处理程序的一部分,它接收带有前缀的目的地/app,所以为了到达@SubscribeMapping("/topic/greetings")你实际上必须编写这样的客户端代码

stompClient.subscribe('/app/topic/greetings', callback)

因为通常topic与经纪人映射以避免歧义,所以我建议将您的代码修改为

@SubscribeMapping("/greetings")

stompClient.subscribe('/app/greetings', callback)

现在console.log('RECEIVED !!!')应该可以了。

官方文档@SubscribeMapping还推荐了初始 UI 渲染的用例场景。

这什么时候有用?假设代理映射到 /topic 和 /queue,而应用程序控制器映射到 /app。在此设置中,代理存储了对 /topic 和 /queue 的所有订阅,这些订阅旨在用于重复广播,并且应用程序无需参与其中。客户端还可以订阅某个 /app 目的地,并且控制器可以返回一个值来响应该订阅,而不涉及代理,而无需再次存储或使用订阅(实际上是一次性的请求-回复交换)。一个用例是在启动时使用初始数据填充 UI。

于 2019-08-28T07:59:12.877 回答
2

也许它并不完全相关,但是当我订阅“app/test”时,无法接收发送到“app/test”的消息。

所以我发现添加经纪人是问题所在(不知道为什么顺便说一句)。

所以这是我之前的代码:

    @Override
    public void configureMessageBroker(MessageBrokerRegistry config) {
        config.setApplicationDestinationPrefixes("/app");
        config.enableSimpleBroker("/topic");
    }

后 :

    @Override
    public void configureMessageBroker(MessageBrokerRegistry config) {
        config.setApplicationDestinationPrefixes("/app");
        // problem line deleted
    }

现在,当我订阅“应用程序/测试”时,这是有效的:

    template.convertAndSend("/app/test", stringSample);

就我而言,我不需要更多。

于 2019-04-25T15:33:55.153 回答