我正在开发网络挂钩通知服务,允许客户端订阅/取消订阅流经中间件的消息,并通过将消息有效负载发布到提供的回调 URL 来获得有关消息的通知(根据提供的标准)。消息传递如下所示:
flowBuilder
.enrichHeaders(e->e.header(MessageHeaders.CONTENT_TYPE, MediaType.APPLICATION_JSON_VALUE,true))
.handle(Http.outboundChannelAdapter(message-> {
String subscriptionId = message.getHeaders().get(SUBSCRIPTION_ID_HEADER_NAME, String.class);
return subscriptionsStore.get(UUID.fromString(subscriptionId)).getCallbackUrl(); //potential NPE if subscription was removed
},restTemplateBuilder.build())
.get()
如您所见,uriFunctionsubscriptionsStore
的实现通过订阅 id(消息头的一部分)获取回调 URL 。
我的问题是关于客户已经使用他的订阅 ID 取消订阅并且我在条件处理程序之后的情况。
filter
我知道我可以过滤订阅 id 仍然存在于订阅商店中的消息,但这不是正确的解决方案,因为客户端可能会handle
在uriFunction
.
另一种解决方案是使用回调 URL 到达标头,然后通过具有非空值的标头进行过滤,但我不想损害原始消息的标头和有效负载。
我可以考虑另一种方法:将不存在的订阅的 URI 计算为一些静态值,并添加拦截器RestTempalte
来模拟此特定 URI 值的 HTTP OK 重播...
所以我的问题是关于使用标准 EIP 或其他我不知道的 Spring 集成功能来处理这种情况的正确方法......
谢谢
更新
我添加了DedicatedMessage
包含上下文的类:
public static class DedicatedMessage extends GenericMessage<Object> implements MessageDecorator{
@Getter
@Transient
private Subscription subscription;
public DedicatedMessage(Subscription subscription,Object payload,Map<String,Object> headers) {
super(payload,headers);
this.subscription = subscription;
}
@Override
public Message<?> decorateMessage(Message<?> message) {
return new DedicatedMessage (subscription,message.getPayload(),message.getHeaders());
}
}
并将流程更改为:
flowBuilder
.enrichHeaders(e->e.header(MessageHeaders.CONTENT_TYPE, MediaType.APPLICATION_JSON_VALUE,true))
.handle((payload, headers) -> {
String subscriptionId = (String) headers.get(SUBSCRIPTION_ID_HEADER_NAME);
Subscription subscription = subscriptionsCache.get(UUID.fromString(subscriptionId));
return Optional.ofNullable(subscription)
.map(s-> new DedicatedMessage(s, payload,headers))
.orElse(null);
})
.handle(Http.outboundChannelAdapter(message->((DedicatedMessage)message).getSubscription().getCallbackUrl()
,restTemplateBuilder.build())
.get()
这个方法有什么问题吗?