我必须读取一个文件并根据第一列拆分每一行和组行,当第一列值发生变化时,我必须释放前一个组。这可以在 Spring 集成 DSL 中完成吗?
这是文件的外观,并且已排序:
x 1
x 2
x 3
y 4
y 5
y 6
输出应该是x = 1, 2, 3和y = 4, 5, 6的两条消息。由于这没有任何其他关系,关于何时应该对消息进行分组,我可以一击就分组消息吗下一个不匹配的记录?在这种情况下,当我在第 4 行点击“y”时,将之前的“x”消息分组并释放它?是否可以使用自定义聚合器?
我必须读取一个文件并根据第一列拆分每一行和组行,当第一列值发生变化时,我必须释放前一个组。这可以在 Spring 集成 DSL 中完成吗?
这是文件的外观,并且已排序:
x 1
x 2
x 3
y 4
y 5
y 6
输出应该是x = 1, 2, 3和y = 4, 5, 6的两条消息。由于这没有任何其他关系,关于何时应该对消息进行分组,我可以一击就分组消息吗下一个不匹配的记录?在这种情况下,当我在第 4 行点击“y”时,将之前的“x”消息分组并释放它?是否可以使用自定义聚合器?
最简单的解决方案是groupTimeout()
尽可能快地在单个线程中拆分和聚合。因此,您的所有记录都将被处理并分发给他们的组。但由于我们不知道如何释放它们,我们将依赖于一些预定的超时。因此,聚合器的配置将如下所示:
.aggregate(a -> a
.correlationExpression("payload.column1")
.releaseStrategy(g -> false)
.groupTimeout(1000)
.sendPartialResultOnExpiry(true)
.outputProcessor(g -> {
Collection<Message<?>> messages = g.getMessages();
// iterate and build your output payload
})
)