1

我必须读取一个文件并根据第一列拆分每一行和组行,当第一列值发生变化时,我必须释放前一个组。这可以在 Spring 集成 DSL 中完成吗?

这是文件的外观,并且已排序:

x 1 
x 2
x 3
y 4
y 5
y 6

输出应该是x = 1, 2, 3和y = 4, 5, 6的两条消息。由于这没有任何其他关系,关于何时应该对消息进行分组,我可以一击就分组消息吗下一个不匹配的记录?在这种情况下,当我在第 4 行点击“y”时,将之前的“x”消息分组并释放它?是否可以使用自定义聚合器?

4

1 回答 1

0

最简单的解决方案是groupTimeout()尽可能快地在单个线程中拆分和聚合。因此,您的所有记录都将被处理并分发给他们的组。但由于我们不知道如何释放它们,我们将依赖于一些预定的超时。因此,聚合器的配置将如下所示:

 .aggregate(a -> a
            .correlationExpression("payload.column1")
            .releaseStrategy(g -> false)
            .groupTimeout(1000)
            .sendPartialResultOnExpiry(true)
            .outputProcessor(g -> {
                Collection<Message<?>> messages = g.getMessages();

                // iterate and build your output payload                                

                })
            )
于 2017-09-18T21:20:24.317 回答