0

我已经用一个典型的简单流程成功地评估了 Spring Cloud DataFlow:source | 处理器 | 下沉。

对于部署,将有多个源输入此管道,我可以使用数据流标签来完成。一切都很好。

每个源都是一个不同的 rabbitmq 实例,并且因为处理器需要知道消息来自哪里(因为它必须回调源系统以获取更多信息),所以我想到的策略是用 header 来丰富每个消息有关源系统的详细信息,然后透明地传递给处理器。

现在,我精通 Spring、Spring Boot 和 Spring Integration,但我不知道如何丰富数据流组件中的每条消息。

源组件绑定到 org.springframework.cloud.stream.app.rabbit.source.RabbitSourceConfiguration。源使用默认的 Source.OUTPUT 通道。如何获取源中的每条消息以丰富它?

我的处理器组件使用一些 Spring Integration DSL 来完成它需要做的一些事情,但是这个处理器组件根据定义同时具有 INPUT 和 OUTPUT 通道。RabbitSourceConfiguration 源并非如此。

那么,这可以做到吗?

4

2 回答 2

0

我认为您需要在 RabbitSourceConfigurationMessageListener上进行自定义。MessageListenerContainer

RabbitSourceConfiguration你可以设置一个自定义ChannelAwareMessageListener(你也可以从它扩展MessagingMessageListenerAdapterMessageListenerContainer来做你想做的事情。

于 2016-11-23T11:39:29.467 回答
0

最后,将 org.springframework.cloud.stream.app.rabbit.source.RabbitSourceConfiguration 子类化为:

  • 覆盖 public SimpleMessageListenerContainer container() 以便我可以在调用 super.container() 之前插入自定义运行状况检查。我的业务逻辑通过消息来自何处的详细信息来丰富每条消息(参见下一个项目符号)(注意,这是消息的发布者,而不是兔子队列)。需要进行健康检查来验证额外的丰富信息(通过配置提供),以确保不会从队列中消耗消息并使用错误的信息进行丰富。如果验证失败,源组件将无法启动,因此不会消耗任何消息。

  • 覆盖 AmqpInboundChannelAdapter bean 的创建,以便可以在适配器上设置 DefaultAmqpHeaderMapper 的自定义子类。此自定义映射器在公共 Map toHeadersFromRequest(final MessageProperties source) 中添加丰富的标头。

对我来说,流/数据流无法拦截和修改 Source 组件中的消息是有问题的。我真的不应该像以前那样摆弄底层消息代理 API。我应该能够使用例如 Spring Integration 来做到这一点。事实上,我可以注册一个全局消息拦截器,但我不能更改消息的标题。

这种能力会出现在我的 WIBNI(如果不是很好)列表中。也许我会提出这个要求。

于 2017-04-04T08:52:47.733 回答