2

我需要实现以下架构:

  1. 我有必须使用 JMS 发送到系统(某些外部应用程序)的数据。

  2. 根据您需要仅发送到必要系统的数据(例如,如果系统数量为 4,那么您可以从 1 发送到 4 )

  3. 需要等待消息发送到的系统的响应,在收到所有响应后,需要处理接收到的数据(或至少处理一次超时)

  4. 相关 ID 包含在传出和传入 JMS 消息的标头中

  5. 每个新的此类进程都可以异步并行启动

现在我只在 Spring JMS 的帮助下实现了它。我手动同步线程,也手动管理线程池。

有关发送消息的系统的相关 ID 和信息被存储为状态,并在接收到新消息等后更新它。

但我想简化逻辑并使用 Spring 集成 Java DSL、Scatter 聚集模式(这只是我的情况)和其他有用的 Spring 特性。

你能帮我展示一个如何在 Spring-integration/IntregrationFlow 的帮助下实现这种架构的例子吗?

4

1 回答 1

1

以下是我们的测试用例中的一些示例:

    @Bean
    public IntegrationFlow scatterGatherFlow() {
        return f -> f
                .scatterGather(scatterer -> scatterer
                                .applySequence(true)
                                .recipientFlow(m -> true, sf -> sf.handle((p, h) -> Math.random() * 10))
                                .recipientFlow(m -> true, sf -> sf.handle((p, h) -> Math.random() * 10))
                                .recipientFlow(m -> true, sf -> sf.handle((p, h) -> Math.random() * 10)),
                        gatherer -> gatherer
                                .releaseStrategy(group ->
                                        group.size() == 3 ||
                                                group.getMessages()
                                                        .stream()
                                                        .anyMatch(m -> (Double) m.getPayload() > 5)),
                        scatterGather -> scatterGather
                                .gatherTimeout(10_000));
    }

所以,有部分:

  • scatterer- 向收件人发送消息。在您的情况下,所有这些 JMS 服务。那可能是一个scatterChannel。通常PublishSubscribeChannel,因此Scatter-Gather可能事先不知道订阅者。

  • gatherer- 好吧,它只是一个aggregator具有所有可能选项的选项。

  • scatterGather- 只是为了方便ScatterGatherHandler和公共端点选项的直接属性。
于 2017-08-11T15:28:30.597 回答