2

目前,我正在测试使用 Spring Integration 将同一个 Spring-Boot 应用程序中的分散模块和服务连接到一个统一的流中,从一个单一入口点开始。

如果可能的话,我正在寻找有关 Spring Integration 的以下说明:

  1. 下面的代码是使用 DSL 构建流的正确方法吗?
  2. 在下面的“C”中,我可以将结果冒泡到“B”流吗?
  3. 使用 DSL 与 XML 是更好的方法吗?
  4. 我对如何正确“终止”流程感到困惑?

流程概述

在下面的代码中,我只是将一个页面发布到一个目的地。整体流程是这样的。

  1. 发布者流侦听有效负载并将其拆分为多个部分。
  2. 内容流过滤掉页面并将它们分成几部分。
    1. AWS 流订阅并处理该部分。
    2. 文件流订阅和处理部分。

最终,发布者流程中可能会有其他非常不同类型的消费者,它们不是内容,这就是我将发布者与内容分开的原因。

A)发布流程(publisher.jar):

这是我通过网关发起的“主要”流程。目的是作为开始触发所有发布流程的入口点。

  1. 接收消息
  2. 预处理消息并保存。
  3. 将有效负载拆分为其中包含的各个条目。
  4. 用其余数据丰富每个条目
  5. 将每个条目放在输出通道上。

下面是代码:

@Bean
IntegrationFlow flowPublish()
{
    return f -> f
        .channel(this.publishingInputChannel())
        //Prepare the payload
        .<Package>handle((p, h) -> this.save(p))
        //Split the artifact resolved items
        .split(Package.class, Package::getItems)
        //Find the artifact associated to each item (if available)
        .enrich(
            e -> e.<PackageEntry>requestPayload(
                m ->
                {
                    final PackageEntry item = m.getPayload();
                    final Publishable publishable = this.findPublishable(item);
                    item.setPublishable(publishable);
                    return item;
                }))
        //Send the results to the output channel
        .channel(this.publishingOutputChannel());
}

B) 内容流 (content.jar)

该模块的职责是处理传入的“内容”有效负载(即本例中的页面)并将它们拆分/路由到适当的订阅者。

  1. 监听发布者输出通道
  2. 仅按页面类型过滤条目
  3. 将原始有效负载添加到标头以供以后使用
  4. 将有效负载转换为实际类型
  5. 将页面拆分为各个元素(块)
  6. 将每个元素路由到适当的 PubSub 频道。

至少目前,订阅的流不会返回任何响应——它们应该只是触发并忘记,但我想知道在使用 pub-sub 通道时如何使结果冒泡。

下面是代码:

@Bean
@ContentChannel("asset")
MessageChannel contentAssetChannel()
{
    return MessageChannels.publishSubscribe("assetPublisherChannel").get();

    //return MessageChannels.queue(10).get();
}

@Bean
@ContentChannel("page")
MessageChannel contentPageChannel()
{
    return MessageChannels.publishSubscribe("pagePublisherChannel").get();

    //return MessageChannels.queue(10).get();
}

@Bean
IntegrationFlow flowPublishContent()
{
    return flow -> flow
        .channel(this.publishingChannel)
        //Filter for root pages (which contain elements)
        .filter(PackageEntry.class, p -> p.getPublishable() instanceof Page)
        //Put the publishable details in the header
        .enrichHeaders(e -> e.headerFunction("item", Message::getPayload))
        //Transform the item to a Page
        .transform(PackageEntry.class, PackageEntry::getPublishable)
        //Split page into components and put the type in the header
        .split(Page.class, this::splitPageElements)
        //Route content based on type to the subscriber
        .<PageContent, String>route(PageContent::getType, mapping -> mapping
            .resolutionRequired(false)
            .subFlowMapping("page", sf -> sf.channel(this.contentPageChannel()))
            .subFlowMapping("image", sf -> sf.channel(this.contentAssetChannel()))
            .defaultOutputToParentFlow())
        .channel(IntegrationContextUtils.NULL_CHANNEL_BEAN_NAME);
}

C) AWS 内容 (aws-content.jar)

该模块是内容特定流的众多潜在订阅者之一。它根据上面发布的路由通道单独处理每个元素。

  1. 订阅相应的频道。
  2. 适当地处理动作。

可以有多个具有订阅上述路由输出通道的流的模块,这只是其中之一。

例如,“contentPageChannel”可以调用下面的 flowPageToS3(在 aws 模块中)以及一个 flowPageToFile(在另一个模块中)。

下面是代码:

@Bean
IntegrationFlow flowAssetToS3()
{
    return flow -> flow
        .channel(this.assetChannel)
        .publishSubscribeChannel(c -> c
            .subscribe(s -> s
                .<PageContent>handle((p, h) ->
                                     {
                                         return this.publishS3Asset(p);
                                     })));
}

@Bean
IntegrationFlow flowPageToS3()
{
    return flow -> flow
        .channel(this.pageChannel)
        .publishSubscribeChannel(c -> c
            .subscribe(s -> s
                .<Page>handle((p, h) -> this.publishS3Page(p))
                .enrichHeaders(e -> e.header("s3Command", Command.UPLOAD.name()))
                .handle(this.s3MessageHandler())));
}
4

1 回答 1

2

First of all there are a lot of content in your question: it's to hard to keep all the info during read. That is your project, so you should be very confident in the subject. But for us that is something new and may just give up even reading not talking already with attempt to answer.

Anyway I'll try to answer to your questions in the beginning, although I feel like you're going to start a long discussion "what?, how?, why?"...

Is the below code the right way to structure flows using the DSL?

It really depends of your logic. That is good idea to distinguish it between logical component, but that might be overhead to sever separate jar on the matter. Looking to your code that seems for me like you still collect everything into single Spring Boot application and just @Autowired appropriate channels to the @Configuration. So, yes, separate @Configuration is good idea, but separate jar is an overhead. IMHO.

In "C" below, can i bubble up the result to the "B" flow?

Well, since the story is about publish-subscribe that is really unusual to wait for reply. How many replies are you going to get from those subscribers? Right, that is the problem - we can send to many subscribers, but we can't get replies from all of them to single return. Let's come back to Java code: we can have several method arguments, but we have only one return. The same is applied here in Messaging. Anyway you may take a look into Scatter-Gather pattern implementation.

Is using the DSL vs. the XML the better approach?

Both are just a high-level API. Underneath there are the same integration components. Looking to your app you'd come to the same distributed solution with the XML configuration. Don't see reason to step back from the Java DSL. At least it is less verbose, for you.

I am confused as to how to correctly "terminate" a flow?

That's absolutely unclear having your big description. If you send to S3 or to File, that is a termination. There is no reply from those components, so no where to go, nothing to do. That is just stop. The same we have with the Java method with void. If you worry about your entry point gateway, so just make it void and don't wait for any replies. See Messaging Gateway for more info.

于 2017-06-27T15:21:24.593 回答