1

我正在开发网络挂钩通知服务,允许客户端订阅/取消订阅流经中间件的消息,并通过将消息有效负载发布到提供的回调 URL 来获得有关消息的通知(根据提供的标准)。消息传递如下所示:

flowBuilder
    .enrichHeaders(e->e.header(MessageHeaders.CONTENT_TYPE, MediaType.APPLICATION_JSON_VALUE,true))
    .handle(Http.outboundChannelAdapter(message-> {
              String subscriptionId = message.getHeaders().get(SUBSCRIPTION_ID_HEADER_NAME, String.class);
                return subscriptionsStore.get(UUID.fromString(subscriptionId)).getCallbackUrl(); //potential NPE if subscription was removed 

            },restTemplateBuilder.build())

.get()

如您所见,uriFunctionsubscriptionsStore的实现通过订阅 id(消息头的一部分)获取回调 URL 。

我的问题是关于客户已经使用他的订阅 ID 取消订阅并且我在条件处理程序之后的情况。

filter我知道我可以过滤订阅 id 仍然存在于订阅商店中的消息,但这不是正确的解决方案,因为客户端可能会handleuriFunction.

另一种解决方案是使用回调 URL 到达标头,然后通过具有非空值的标头进行过滤,但我不想损害原始消息的标头和有效负载。

我可以考虑另一种方法:将不存在的订阅的 URI 计算为一些静态值,并添加拦截器RestTempalte来模拟此特定 URI 值的 HTTP OK 重播...

所以我的问题是关于使用标准 EIP 或其他我不知道的 Spring 集成功能来处理这种情况的正确方法......

谢谢

更新

我添加了DedicatedMessage包含上下文的类:

public static class DedicatedMessage  extends GenericMessage<Object> implements MessageDecorator{
        @Getter
        @Transient
        private Subscription subscription;


        public DedicatedMessage(Subscription subscription,Object payload,Map<String,Object> headers) {
            super(payload,headers);
            this.subscription = subscription;
        }

        @Override
        public Message<?> decorateMessage(Message<?> message) {
            return new DedicatedMessage (subscription,message.getPayload(),message.getHeaders());
        }
    }

并将流程更改为:

flowBuilder
   .enrichHeaders(e->e.header(MessageHeaders.CONTENT_TYPE, MediaType.APPLICATION_JSON_VALUE,true))
  .handle((payload, headers) -> {
                        String subscriptionId = (String) headers.get(SUBSCRIPTION_ID_HEADER_NAME);
                        Subscription subscription = subscriptionsCache.get(UUID.fromString(subscriptionId));
                        return Optional.ofNullable(subscription)
                                .map(s->  new DedicatedMessage(s, payload,headers))
                                .orElse(null);
                    })
  .handle(Http.outboundChannelAdapter(message->((DedicatedMessage)message).getSubscription().getCallbackUrl()
                            ,restTemplateBuilder.build())
.get()

这个方法有什么问题吗?

4

1 回答 1

1

我不确定你的NRE缩写是什么意思,但你可以NoSuchSubscriptionException从你的subscriptionsStore.get()方法中抛出一个,然后在ExpressionEvaluatingRequestHandlerAdvice应用到其endpoint.advice()链中的出站通道适配器中忽略/报告该异常。

于 2017-11-05T17:15:05.783 回答