1

我是 Spring 集成的新手。我正在尝试使用文件拆分器从文件中拆分消息,然后使用 .aggregate() 构建单个消息并发送到输出通道。我将标记设为 true,因此 apply-sequence 现在默认为 false。我使用enrichHeaders 将correlationId 设置为常数“1”。我无法设置释放策略,因为我没有保留序列结束。这是我的代码的外观。

    IntegrationFlows
                .from(s -> s.file(new File(fileDir))
                                .filter(getFileFilter(fileName)),
                        e -> e.poller(poller))

                .split(Files.splitter(true, true)
                                .charset(StandardCharsets.US_ASCII),
                        e -> e.id(beanName)).enrichHeaders(h -> h.header("correlationId", "1"));

 IntegrationFlow integrationFlow = integrationFlowBuilder
            .<Object, Class<?>>route(Object::getClass, m -> m
                    .channelMapping(FileSplitter.FileMarker.class, "markers.input")
                    .channelMapping(String.class, "lines.input"))
            .get();

@Bean
    public IntegrationFlow itemExcludes() {
        return flow -> flow.transform(new ItemExcludeRowMapper(itemExcludeRowUnmarshaller)) //This maps each line to ItemExclude object
                .aggregate(aggregator -> aggregator
                        .outputProcessor(group -> group.getMessages()
                        .stream()
                        .map(message -> ((ItemExclude) message.getPayload()).getPartNumber())
                        .collect(Collectors.joining(","))))
                .transform(Transformers.toJson())
                .channel(customSource.itemExclude());
    }

    @Bean
    public IntegrationFlow itemExcludeMarkers() {
        return flow -> flow
                .log(LoggingHandler.Level.INFO)
                .<FileSplitter.FileMarker>filter(m -> m.getMark().equals(FileSplitter.FileMarker.Mark.END))
                .<FileHandler>handle(new FileHandler(configProps))
                .channel(NULL_CHANNEL);
    }

任何帮助表示赞赏。

4

2 回答 2

1

correlationId我会在之前移动您的标题丰富器splitter并使其如下所示:

 .enrichHeaders(h -> h
        .headerFunction(IntegrationMessageHeaderAccessor.CORRELATION_ID, 
                   m -> m.getHeaders().getId())) 

常量correlationId在多线程环境下绝对不好用:不同的线程分割不同的文件,将不同的行发送到同一个聚合器。因此,使用"1"as 关联键,您将始终拥有一组来聚合和发布。默认序列行为是将原始消息填充idcorrelationId. 由于您不会依赖我建议的简单解决方案来模拟该行为applySequenceFileSplitter

正如加里在他的回答中指出的那样,您需要考虑自定义ReleaseStrategy并发FileSplitter.FileMarker送给聚合器。可以与 比较的FileSplitter.FileMarker.END具有属性来决定我们是否可以释放该组。在构建输出结果期间确实必须过滤消息。lineCountMessageGroup.sizeMessageGroupProcessorFileSplitter.FileMarker

于 2017-09-15T13:17:37.507 回答
0

使用在最后一条消息中查找 END 标记的自定义发布策略,也许还可以使用从集合中删除标记的自定义输出处理器。

于 2017-09-15T13:03:50.587 回答