目前,我正在测试使用 Spring Integration 将同一个 Spring-Boot 应用程序中的分散模块和服务连接到一个统一的流中,从一个单一入口点开始。
如果可能的话,我正在寻找有关 Spring Integration 的以下说明:
- 下面的代码是使用 DSL 构建流的正确方法吗?
- 在下面的“C”中,我可以将结果冒泡到“B”流吗?
- 使用 DSL 与 XML 是更好的方法吗?
- 我对如何正确“终止”流程感到困惑?
流程概述
在下面的代码中,我只是将一个页面发布到一个目的地。整体流程是这样的。
- 发布者流侦听有效负载并将其拆分为多个部分。
- 内容流过滤掉页面并将它们分成几部分。
- AWS 流订阅并处理该部分。
- 文件流订阅和处理部分。
最终,发布者流程中可能会有其他非常不同类型的消费者,它们不是内容,这就是我将发布者与内容分开的原因。
A)发布流程(publisher.jar):
这是我通过网关发起的“主要”流程。目的是作为开始触发所有发布流程的入口点。
- 接收消息
- 预处理消息并保存。
- 将有效负载拆分为其中包含的各个条目。
- 用其余数据丰富每个条目
- 将每个条目放在输出通道上。
下面是代码:
@Bean
IntegrationFlow flowPublish()
{
return f -> f
.channel(this.publishingInputChannel())
//Prepare the payload
.<Package>handle((p, h) -> this.save(p))
//Split the artifact resolved items
.split(Package.class, Package::getItems)
//Find the artifact associated to each item (if available)
.enrich(
e -> e.<PackageEntry>requestPayload(
m ->
{
final PackageEntry item = m.getPayload();
final Publishable publishable = this.findPublishable(item);
item.setPublishable(publishable);
return item;
}))
//Send the results to the output channel
.channel(this.publishingOutputChannel());
}
B) 内容流 (content.jar)
该模块的职责是处理传入的“内容”有效负载(即本例中的页面)并将它们拆分/路由到适当的订阅者。
- 监听发布者输出通道
- 仅按页面类型过滤条目
- 将原始有效负载添加到标头以供以后使用
- 将有效负载转换为实际类型
- 将页面拆分为各个元素(块)
- 将每个元素路由到适当的 PubSub 频道。
至少目前,订阅的流不会返回任何响应——它们应该只是触发并忘记,但我想知道在使用 pub-sub 通道时如何使结果冒泡。
下面是代码:
@Bean
@ContentChannel("asset")
MessageChannel contentAssetChannel()
{
return MessageChannels.publishSubscribe("assetPublisherChannel").get();
//return MessageChannels.queue(10).get();
}
@Bean
@ContentChannel("page")
MessageChannel contentPageChannel()
{
return MessageChannels.publishSubscribe("pagePublisherChannel").get();
//return MessageChannels.queue(10).get();
}
@Bean
IntegrationFlow flowPublishContent()
{
return flow -> flow
.channel(this.publishingChannel)
//Filter for root pages (which contain elements)
.filter(PackageEntry.class, p -> p.getPublishable() instanceof Page)
//Put the publishable details in the header
.enrichHeaders(e -> e.headerFunction("item", Message::getPayload))
//Transform the item to a Page
.transform(PackageEntry.class, PackageEntry::getPublishable)
//Split page into components and put the type in the header
.split(Page.class, this::splitPageElements)
//Route content based on type to the subscriber
.<PageContent, String>route(PageContent::getType, mapping -> mapping
.resolutionRequired(false)
.subFlowMapping("page", sf -> sf.channel(this.contentPageChannel()))
.subFlowMapping("image", sf -> sf.channel(this.contentAssetChannel()))
.defaultOutputToParentFlow())
.channel(IntegrationContextUtils.NULL_CHANNEL_BEAN_NAME);
}
C) AWS 内容 (aws-content.jar)
该模块是内容特定流的众多潜在订阅者之一。它根据上面发布的路由通道单独处理每个元素。
- 订阅相应的频道。
- 适当地处理动作。
可以有多个具有订阅上述路由输出通道的流的模块,这只是其中之一。
例如,“contentPageChannel”可以调用下面的 flowPageToS3(在 aws 模块中)以及一个 flowPageToFile(在另一个模块中)。
下面是代码:
@Bean
IntegrationFlow flowAssetToS3()
{
return flow -> flow
.channel(this.assetChannel)
.publishSubscribeChannel(c -> c
.subscribe(s -> s
.<PageContent>handle((p, h) ->
{
return this.publishS3Asset(p);
})));
}
@Bean
IntegrationFlow flowPageToS3()
{
return flow -> flow
.channel(this.pageChannel)
.publishSubscribeChannel(c -> c
.subscribe(s -> s
.<Page>handle((p, h) -> this.publishS3Page(p))
.enrichHeaders(e -> e.header("s3Command", Command.UPLOAD.name()))
.handle(this.s3MessageHandler())));
}