0

您好,我有一个集成流程,它逐行拆分文件,将每一行转换为 POJO,然后通过 JDBC 出站网关将该 POJO 插入到数据库中。

一旦文件处理完成,我希望能够发送一封电子邮件。我目前在我的 jdbcOutboundGateway 之后发送到 smtpFlow 通道,但是这是在每次插入数据库后发送一封电子邮件。

这是我当前的流量 DSL

IntegrationFlow ftpFlow() {
    return IntegrationFlows.from(
            ftpSource(), spec -> spec.poller(Pollers.fixedDelay(5, TimeUnit.SECONDS)))
            .split(splitFile())
            .transform(this::transformToIndividualScore)
            .handle(jdbcOutboundGateway(null))
            .channel("smtpFlow")
            .get();

在处理完所有文件后,如何让此流程仅发送一封电子邮件jdbcOutboundGateway

这是我的splitFile()方法

@Bean
FileSplitter splitFile() {
    FileSplitter fs = new FileSplitter(true, false);
    fs.setFirstLineAsHeader("IndividualScore");
    return fs;

这是我的transformToIndividualScore方法

@Transformer
private IndividualScore transformToIndividualScore(String payload) {
    String[] values = payload.split(",");
    IndividualScore is = new IndividualScore();
    is.setScorecardDate(values[0]);
    is.setVnSpId(values[1]);
    is.setPrimaryCat(values[2]);
    is.setSecondaryCat(values[3]);
    is.setScore(Integer.parseInt(values[4]));
    is.setActual(values[5]);
    return is;
}
4

3 回答 3

0

在句柄之后添加.aggregate()以将拆分的结果组合回单个消息。

于 2020-01-29T15:00:43.203 回答
0

所以解决我的问题,(有点)。

false在 my now 上标记迭代器FileSplitter允许排序标题。

更新splitFile()如下

@Bean
FileSplitter splitFile() {
    FileSplitter fs = new FileSplitter(false, false);
    fs.setFirstLineAsHeader("IndividualScore");
    fs.setApplySequence(true);
    return fs;
}

我的直觉告诉我,默认的.aggregate()发布策略一定是消息头sequenceSize==聚合的消息列表。

创建FileSplitteriterator设置truesequenceSize设置为0永远不会满足默认的发布策略.aggregate()

但是,这使得FileSplitter使用 aList将文件的所有行存储在内存中。聚合器还在内存中存储另一ArrayList行。

是否有更好的解决方案来创建一个自定义聚合器来处理END FileMarker允许使用迭代器来拆分文件?

于 2020-01-29T17:18:58.997 回答
0

在@ArtemBilan 的帮助下

我能够按顺序使用publishSubscribeChannel()and链2方法是新的subscribe()IntegrationFlow

 @Bean
IntegrationFlow ftpFlow() {
    return IntegrationFlows.from(
            ftpSource(), spec -> spec.poller(Pollers.fixedDelay(5, TimeUnit.SECONDS)))
            .publishSubscribeChannel(channel -> channel
                    .subscribe(
                        a -> a
                                .split(splitFile())
                                .transform(this::transformToIndividualScore)
                                .handle(jdbcMessageHandler(null)))
                    .subscribe(
                        b -> b
                                .transform(this::transformToSuccessEmail)
                                .handle(emailHandler()))
            )
            .get();
于 2020-01-29T19:05:23.103 回答