1

所以我一直在阅读有关使用 RabbitMQ 代理的 Spring Message Relay(Spring Messaging 的东西)功能。我想要实现的目标如下:

有一个服务(1),它充当rabbitmq和浏览器之间的消息中继。这现在工作正常。我正在使用 MessageBrokerRegistry.enableStompBrokerRelay 来做到这一点。

在后端有另一个服务 (2),它将向 RabbitMQ 上的已知队列发送消息,并将该消息路由到特定用户。作为发件人,我想控制消息被传递给谁。

通常,您会使用 SimpMessagingTemplate 来执行此操作。但问题是,消息的来源实际上无法访问该模板,因为它没有充当中继,没有使用 websockets,也没有保存队列名称到会话 ID 的映射。

我能想到的一种方法是在服务 1 上编写一个简单的类,它将侦听所有队列并使用 simp 模板转发它们。但是我摔倒了,这不是一个理想的方法,我觉得可能已经有一种方法可以使用 Spring 来实现它。

你能给些建议么?

4

1 回答 1

1

这个问题让我想到了我面临的同样困境。我已经开始使用自定义的UserDestinationResolver,它达到了一致的主题命名方案,该方案仅使用用户名而不是默认解析器使用的会话 ID。

这让我可以在 JS 中订阅“/user/exchange/amq.direct/current-time”,但通过 vanilla RabbitMQ 应用程序发送到“/exchange/amqp.direct/users.me.current-time”(发送给名为“我”)。

最新的源代码在这里,我将它“注册”为我拥有的现有 @Configuration 类中的 @Bean 。

这是自定义UserDestinationResolver本身:

public class ConsistentUserDestinationResolver implements UserDestinationResolver {
    private static final Pattern USER_DEST_PREFIXING_PATTERN =
            Pattern.compile("/user/(?<name>.+?)/(?<routing>.+)/(?<dest>.+?)");

    private static final Pattern USER_AUTHENTICATED_PATTERN =
            Pattern.compile("/user/(?<routing>.*)/(?<dest>.+?)");

    @Override
    public UserDestinationResult resolveDestination(Message<?> message) {
        SimpMessageHeaderAccessor accessor = MessageHeaderAccessor.getAccessor(message, SimpMessageHeaderAccessor.class);

        final String destination = accessor.getDestination();
        final String authUser = accessor.getUser() != null ? accessor.getUser().getName() : null;

        if (destination != null) {
            if (SimpMessageType.SUBSCRIBE.equals(accessor.getMessageType()) ||
                    SimpMessageType.UNSUBSCRIBE.equals(accessor.getMessageType())) {
                if (authUser != null) {
                    final Matcher authMatcher = USER_AUTHENTICATED_PATTERN.matcher(destination);
                    if (authMatcher.matches()) {
                        String result = String.format("/%s/users.%s.%s",
                                authMatcher.group("routing"), authUser, authMatcher.group("dest"));
                        UserDestinationResult userDestinationResult =
                                new UserDestinationResult(destination, Collections.singleton(result), result, authUser);
                        return userDestinationResult;
                    }
                }
            }
            else if (accessor.getMessageType().equals(SimpMessageType.MESSAGE)) {
                final Matcher prefixMatcher = USER_DEST_PREFIXING_PATTERN.matcher(destination);
                if (prefixMatcher.matches()) {
                    String user = prefixMatcher.group("name");
                    String result = String.format("/%s/users.%s.%s",
                            prefixMatcher.group("routing"), user, prefixMatcher.group("dest"));
                    UserDestinationResult userDestinationResult =
                            new UserDestinationResult(destination, Collections.singleton(result), result, user);
                    return userDestinationResult;
                }
            }
        }

        return null;
    }
}
于 2017-02-20T22:17:17.570 回答